From 5e6643768615be3276a572f1b24473da07d70d49 Mon Sep 17 00:00:00 2001 From: Peter Kelly Date: Fri, 10 Aug 2018 11:49:51 +0100 Subject: [PATCH] More refactoring, mainly tidy up of controller --- .gitignore | 1 + Makefile | 2 +- cmd/{nginx-ic => nginx-ingress}/main.go | 26 +- internal/controller/controller.go | 549 +++++++++++++----------- internal/controller/controller_test.go | 58 +-- internal/controller/utils.go | 22 +- 6 files changed, 346 insertions(+), 312 deletions(-) rename cmd/{nginx-ic => nginx-ingress}/main.go (94%) diff --git a/.gitignore b/.gitignore index d8c50dc81e..820e61d81b 100644 --- a/.gitignore +++ b/.gitignore @@ -24,6 +24,7 @@ Session.vim # Ingress Controller binaries osx-nginx-ingress nginx-ingress +!nginx-ingress/ osx-nginx-plus-ingress nginx-plus-ingress cmd/nginx-ic/nginx-ic diff --git a/Makefile b/Makefile index 91d3cb118d..f8633bad11 100644 --- a/Makefile +++ b/Makefile @@ -5,7 +5,7 @@ TAG = $(VERSION) PREFIX = nginx/nginx-ingress DOCKER_RUN = docker run --rm -v $(shell pwd):/go/src/github.com/nginxinc/kubernetes-ingress -DOCKER_BUILD_RUN = docker run --rm -v $(shell pwd):/go/src/github.com/nginxinc/kubernetes-ingress -w /go/src/github.com/nginxinc/kubernetes-ingress/cmd/nginx-ic/ +DOCKER_BUILD_RUN = docker run --rm -v $(shell pwd):/go/src/github.com/nginxinc/kubernetes-ingress -w /go/src/github.com/nginxinc/kubernetes-ingress/cmd/nginx-ingress/ GOLANG_CONTAINER = golang:1.10 DOCKERFILEPATH = build DOCKERFILE = Dockerfile diff --git a/cmd/nginx-ic/main.go b/cmd/nginx-ingress/main.go similarity index 94% rename from cmd/nginx-ic/main.go rename to cmd/nginx-ingress/main.go index 2ae44e662e..a5c3b33cf4 100644 --- a/cmd/nginx-ic/main.go +++ b/cmd/nginx-ingress/main.go @@ -230,19 +230,19 @@ func main() { controllerNamespace := os.Getenv("POD_NAMESPACE") lbcInput := controller.NewLoadBalancerControllerInput{ - KubeClient: kubeClient, - ResyncPeriod: 30 * time.Second, - Namespace: *watchNamespace, - CNF: cnf, - NginxConfigMaps: *nginxConfigMaps, - DefaultServerSecret: *defaultServerSecret, - NginxPlus: *nginxPlus, - IngressClass: *ingressClass, - UseIngressClassOnly: *useIngressClassOnly, - ExternalServiceName: *externalService, - ControllerNamespace: controllerNamespace, - ReportIngressStatus: *reportIngressStatus, - LeaderElectionEnabled: *leaderElectionEnabled, + KubeClient: kubeClient, + ResyncPeriod: 30 * time.Second, + Namespace: *watchNamespace, + NginxConfigurator: cnf, + NginxConfigMaps: *nginxConfigMaps, + DefaultServerSecret: *defaultServerSecret, + IsNginxPlus: *nginxPlus, + IngressClass: *ingressClass, + UseIngressClassOnly: *useIngressClassOnly, + ExternalServiceName: *externalService, + ControllerNamespace: controllerNamespace, + ReportIngressStatus: *reportIngressStatus, + IsLeaderElectionEnabled: *leaderElectionEnabled, } lbc := controller.NewLoadBalancerController(lbcInput) go handleTermination(lbc, ngxc, nginxDone) diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 9fe5372f4b..09a9a0c7a9 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -49,63 +49,63 @@ const ( // LoadBalancerController watches Kubernetes API and // reconfigures NGINX via NginxController when needed type LoadBalancerController struct { - client kubernetes.Interface - ingController cache.Controller - svcController cache.Controller - endpController cache.Controller - cfgmController cache.Controller - secrController cache.Controller - ingLister StoreToIngressLister - svcLister cache.Store - endpLister StoreToEndpointLister - cfgmLister StoreToConfigMapLister - secrLister StoreToSecretLister - syncQueue *taskQueue - stopCh chan struct{} - cnf *nginx.Configurator - watchNginxConfigMaps bool - nginxPlus bool - recorder record.EventRecorder - defaultServerSecret string - ingressClass string - useIngressClassOnly bool - statusUpdater *StatusUpdater - leaderElector *leaderelection.LeaderElector - reportIngressStatus bool - leaderElectionEnabled bool + client kubernetes.Interface + ingressController cache.Controller + svcController cache.Controller + endpointController cache.Controller + configMapController cache.Controller + secretController cache.Controller + ingressLister StoreToIngressLister + svcLister cache.Store + endpointLister StoreToEndpointLister + configMapLister StoreToConfigMapLister + secretLister StoreToSecretLister + syncQueue *TaskQueue + stop chan struct{} + configurator *nginx.Configurator + watchNginxConfigMaps bool + isNginxPlus bool + recorder record.EventRecorder + defaultServerSecret string + ingressClass string + useIngressClassOnly bool + statusUpdater *StatusUpdater + leaderElector *leaderelection.LeaderElector + reportIngressStatus bool + isLeaderElectionEnabled bool } var keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc // NewLoadBalancerControllerInput holds the input needed to call NewLoadBalancerController. type NewLoadBalancerControllerInput struct { - KubeClient kubernetes.Interface - ResyncPeriod time.Duration - Namespace string - CNF *nginx.Configurator - NginxConfigMaps string - DefaultServerSecret string - NginxPlus bool - IngressClass string - UseIngressClassOnly bool - ExternalServiceName string - ControllerNamespace string - ReportIngressStatus bool - LeaderElectionEnabled bool + KubeClient kubernetes.Interface + ResyncPeriod time.Duration + Namespace string + NginxConfigurator *nginx.Configurator + NginxConfigMaps string + DefaultServerSecret string + IsNginxPlus bool + IngressClass string + UseIngressClassOnly bool + ExternalServiceName string + ControllerNamespace string + ReportIngressStatus bool + IsLeaderElectionEnabled bool } // NewLoadBalancerController creates a controller func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalancerController { lbc := LoadBalancerController{ - client: input.KubeClient, - stopCh: make(chan struct{}), - cnf: input.CNF, - defaultServerSecret: input.DefaultServerSecret, - nginxPlus: input.NginxPlus, - ingressClass: input.IngressClass, - useIngressClassOnly: input.UseIngressClassOnly, - reportIngressStatus: input.ReportIngressStatus, - leaderElectionEnabled: input.LeaderElectionEnabled, + client: input.KubeClient, + stop: make(chan struct{}), + configurator: input.NginxConfigurator, + defaultServerSecret: input.DefaultServerSecret, + isNginxPlus: input.IsNginxPlus, + ingressClass: input.IngressClass, + useIngressClassOnly: input.UseIngressClassOnly, + reportIngressStatus: input.ReportIngressStatus, + isLeaderElectionEnabled: input.IsLeaderElectionEnabled, } eventBroadcaster := record.NewBroadcaster() @@ -120,205 +120,199 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc glog.V(3).Infof("Nginx Ingress Controller has class: %v", input.IngressClass) - ingHandlers := cache.ResourceEventHandlerFuncs{ + ingressHandlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - addIng := obj.(*extensions.Ingress) - if !lbc.isNginxIngress(addIng) { - glog.Infof("Ignoring Ingress %v based on Annotation %v", addIng.Name, ingressClassKey) + ingress := obj.(*extensions.Ingress) + if !lbc.isNginxIngress(ingress) { + glog.Infof("Ignoring Ingress %v based on Annotation %v", ingress.Name, ingressClassKey) return } - glog.V(3).Infof("Adding Ingress: %v", addIng.Name) + glog.V(3).Infof("Adding Ingress: %v", ingress.Name) lbc.syncQueue.enqueue(obj) }, DeleteFunc: func(obj interface{}) { - remIng, isIng := obj.(*extensions.Ingress) + ingress, isIng := obj.(*extensions.Ingress) if !isIng { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.V(3).Infof("Error received unexpected object: %v", obj) return } - remIng, ok = deletedState.Obj.(*extensions.Ingress) + ingress, ok = deletedState.Obj.(*extensions.Ingress) if !ok { glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Ingress object: %v", deletedState.Obj) return } } - if !lbc.isNginxIngress(remIng) { + if !lbc.isNginxIngress(ingress) { return } - if isMinion(remIng) { - master, err := lbc.findMasterForMinion(remIng) + if isMinion(ingress) { + master, err := lbc.findMasterForMinion(ingress) if err != nil { - glog.Infof("Ignoring Ingress %v(Minion): %v", remIng.Name, err) + glog.Infof("Ignoring Ingress %v(Minion): %v", ingress.Name, err) return } - glog.V(3).Infof("Removing Ingress: %v(Minion) for %v(Master)", remIng.Name, master.Name) + glog.V(3).Infof("Removing Ingress: %v(Minion) for %v(Master)", ingress.Name, master.Name) lbc.syncQueue.enqueue(master) } else { - glog.V(3).Infof("Removing Ingress: %v", remIng.Name) + glog.V(3).Infof("Removing Ingress: %v", ingress.Name) lbc.syncQueue.enqueue(obj) } }, - UpdateFunc: func(old, cur interface{}) { - curIng := cur.(*extensions.Ingress) - oldIng := old.(*extensions.Ingress) - if !lbc.isNginxIngress(curIng) { + UpdateFunc: func(old, current interface{}) { + c := current.(*extensions.Ingress) + o := old.(*extensions.Ingress) + if !lbc.isNginxIngress(c) { return } - if hasChanges(oldIng, curIng) { - glog.V(3).Infof("Ingress %v changed, syncing", curIng.Name) - lbc.syncQueue.enqueue(cur) + if hasChanges(o, c) { + glog.V(3).Infof("Ingress %v changed, syncing", c.Name) + lbc.syncQueue.enqueue(c) } }, } - lbc.ingLister.Store, lbc.ingController = cache.NewInformer( - cache.NewListWatchFromClient(lbc.client.Extensions().RESTClient(), "ingresses", input.Namespace, fields.Everything()), - &extensions.Ingress{}, input.ResyncPeriod, ingHandlers) - - // statusUpdater requires ingLister to be instantiated, above. - lbc.statusUpdater = &StatusUpdater{ - client: input.KubeClient, - namespace: input.ControllerNamespace, - externalServiceName: input.ExternalServiceName, - ingLister: &lbc.ingLister, - keyFunc: keyFunc, - } - - if input.ReportIngressStatus && input.LeaderElectionEnabled { - leaderCallbacks := leaderelection.LeaderCallbacks{ - OnStartedLeading: func(stop <-chan struct{}) { - glog.V(3).Info("started leading, updating ingress status") - ingresses, mergeableIngresses := lbc.getManagedIngresses() - err := lbc.statusUpdater.UpdateManagedAndMergeableIngresses(ingresses, mergeableIngresses) - if err != nil { - glog.V(3).Infof("error updating status when starting leading: %v", err) - } - }, - } - var err error - lbc.leaderElector, err = NewLeaderElector(input.KubeClient, leaderCallbacks, input.ControllerNamespace) - if err != nil { - glog.V(3).Infof("Error starting LeaderElection: %v", err) - } - } + lbc.ingressLister.Store, lbc.ingressController = cache.NewInformer( + cache.NewListWatchFromClient( + lbc.client.Extensions().RESTClient(), + "ingresses", + input.Namespace, + fields.Everything()), + &extensions.Ingress{}, + input.ResyncPeriod, + ingressHandlers, + ) svcHandlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - addSvc := obj.(*api_v1.Service) - if lbc.isExternalServiceForStatus(addSvc) { - lbc.syncQueue.enqueue(addSvc) + svc := obj.(*api_v1.Service) + if lbc.isExternalServiceForStatus(svc) { + lbc.syncQueue.enqueue(svc) return } - glog.V(3).Infof("Adding service: %v", addSvc.Name) - lbc.enqueueIngressForService(addSvc) + glog.V(3).Infof("Adding service: %v", svc.Name) + lbc.enqueueIngressForService(svc) }, DeleteFunc: func(obj interface{}) { - remSvc, isSvc := obj.(*api_v1.Service) + svc, isSvc := obj.(*api_v1.Service) if !isSvc { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.V(3).Infof("Error received unexpected object: %v", obj) return } - remSvc, ok = deletedState.Obj.(*api_v1.Service) + svc, ok = deletedState.Obj.(*api_v1.Service) if !ok { glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Service object: %v", deletedState.Obj) return } } - if lbc.isExternalServiceForStatus(remSvc) { - lbc.syncQueue.enqueue(remSvc) + if lbc.isExternalServiceForStatus(svc) { + lbc.syncQueue.enqueue(svc) return } - glog.V(3).Infof("Removing service: %v", remSvc.Name) - lbc.enqueueIngressForService(remSvc) + glog.V(3).Infof("Removing service: %v", svc.Name) + lbc.enqueueIngressForService(svc) }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { - curSvc := cur.(*api_v1.Service) - if lbc.isExternalServiceForStatus(curSvc) { - lbc.syncQueue.enqueue(curSvc) + svc := cur.(*api_v1.Service) + if lbc.isExternalServiceForStatus(svc) { + lbc.syncQueue.enqueue(svc) return } - glog.V(3).Infof("Service %v changed, syncing", curSvc.Name) - lbc.enqueueIngressForService(curSvc) + glog.V(3).Infof("Service %v changed, syncing", svc.Name) + lbc.enqueueIngressForService(svc) } }, } lbc.svcLister, lbc.svcController = cache.NewInformer( - cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "services", input.Namespace, fields.Everything()), - &api_v1.Service{}, input.ResyncPeriod, svcHandlers) - - endpHandlers := cache.ResourceEventHandlerFuncs{ + cache.NewListWatchFromClient( + lbc.client.Core().RESTClient(), + "services", + input.Namespace, + fields.Everything()), + &api_v1.Service{}, + input.ResyncPeriod, + svcHandlers, + ) + + endpointHandlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - addEndp := obj.(*api_v1.Endpoints) - glog.V(3).Infof("Adding endpoints: %v", addEndp.Name) + endpoint := obj.(*api_v1.Endpoints) + glog.V(3).Infof("Adding endpoints: %v", endpoint.Name) lbc.syncQueue.enqueue(obj) }, DeleteFunc: func(obj interface{}) { - remEndp, isEndp := obj.(*api_v1.Endpoints) - if !isEndp { + endpoint, isEndpoint := obj.(*api_v1.Endpoints) + if !isEndpoint { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.V(3).Infof("Error received unexpected object: %v", obj) return } - remEndp, ok = deletedState.Obj.(*api_v1.Endpoints) + endpoint, ok = deletedState.Obj.(*api_v1.Endpoints) if !ok { glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Endpoints object: %v", deletedState.Obj) return } } - glog.V(3).Infof("Removing endpoints: %v", remEndp.Name) + glog.V(3).Infof("Removing endpoints: %v", endpoint.Name) lbc.syncQueue.enqueue(obj) }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { - glog.V(3).Infof("Endpoints %v changed, syncing", - cur.(*api_v1.Endpoints).Name) + glog.V(3).Infof("Endpoints %v changed, syncing", cur.(*api_v1.Endpoints).Name) lbc.syncQueue.enqueue(cur) } }, } - lbc.endpLister.Store, lbc.endpController = cache.NewInformer( - cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "endpoints", input.Namespace, fields.Everything()), - &api_v1.Endpoints{}, input.ResyncPeriod, endpHandlers) - secrHandlers := cache.ResourceEventHandlerFuncs{ + lbc.endpointLister.Store, lbc.endpointController = cache.NewInformer( + cache.NewListWatchFromClient( + lbc.client.Core().RESTClient(), + "endpoints", + input.Namespace, + fields.Everything()), + &api_v1.Endpoints{}, + input.ResyncPeriod, + endpointHandlers, + ) + + secretHandlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - secr := obj.(*api_v1.Secret) - if err := lbc.ValidateSecret(secr); err != nil { + secret := obj.(*api_v1.Secret) + if err := lbc.ValidateSecret(secret); err != nil { return } - nsname := secr.Namespace + "/" + secr.Name + nsname := secret.Namespace + "/" + secret.Name if nsname == lbc.defaultServerSecret { - glog.V(3).Infof("Adding default server Secret: %v", secr.Name) + glog.V(3).Infof("Adding default server Secret: %v", secret.Name) lbc.syncQueue.enqueue(obj) } }, DeleteFunc: func(obj interface{}) { - remSecr, isSecr := obj.(*api_v1.Secret) + secret, isSecr := obj.(*api_v1.Secret) if !isSecr { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.V(3).Infof("Error received unexpected object: %v", obj) return } - remSecr, ok = deletedState.Obj.(*api_v1.Secret) + secret, ok = deletedState.Obj.(*api_v1.Secret) if !ok { glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-Secret object: %v", deletedState.Obj) return } } - if err := lbc.ValidateSecret(remSecr); err != nil { + if err := lbc.ValidateSecret(secret); err != nil { return } - glog.V(3).Infof("Removing Secret: %v", remSecr.Name) + glog.V(3).Infof("Removing Secret: %v", secret.Name) lbc.syncQueue.enqueue(obj) }, UpdateFunc: func(old, cur interface{}) { @@ -329,16 +323,22 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc } if !reflect.DeepEqual(old, cur) { - glog.V(3).Infof("Secret %v changed, syncing", - cur.(*api_v1.Secret).Name) + glog.V(3).Infof("Secret %v changed, syncing", cur.(*api_v1.Secret).Name) lbc.syncQueue.enqueue(cur) } }, } - lbc.secrLister.Store, lbc.secrController = cache.NewInformer( - cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "secrets", input.Namespace, fields.Everything()), - &api_v1.Secret{}, input.ResyncPeriod, secrHandlers) + lbc.secretLister.Store, lbc.secretController = cache.NewInformer( + cache.NewListWatchFromClient( + lbc.client.Core().RESTClient(), + "secrets", + input.Namespace, + fields.Everything()), + &api_v1.Secret{}, + input.ResyncPeriod, + secretHandlers, + ) if input.NginxConfigMaps != "" { nginxConfigMapsNS, nginxConfigMapsName, err := ParseNamespaceName(input.NginxConfigMaps) @@ -347,47 +347,82 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc } else { lbc.watchNginxConfigMaps = true - cfgmHandlers := cache.ResourceEventHandlerFuncs{ + configMapHandlers := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - cfgm := obj.(*api_v1.ConfigMap) - if cfgm.Name == nginxConfigMapsName { - glog.V(3).Infof("Adding ConfigMap: %v", cfgm.Name) + configMap := obj.(*api_v1.ConfigMap) + if configMap.Name == nginxConfigMapsName { + glog.V(3).Infof("Adding ConfigMap: %v", configMap.Name) lbc.syncQueue.enqueue(obj) } }, DeleteFunc: func(obj interface{}) { - cfgm, isCfgm := obj.(*api_v1.ConfigMap) - if !isCfgm { + configMap, isConfigMap := obj.(*api_v1.ConfigMap) + if !isConfigMap { deletedState, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { glog.V(3).Infof("Error received unexpected object: %v", obj) return } - cfgm, ok = deletedState.Obj.(*api_v1.ConfigMap) + configMap, ok = deletedState.Obj.(*api_v1.ConfigMap) if !ok { glog.V(3).Infof("Error DeletedFinalStateUnknown contained non-ConfigMap object: %v", deletedState.Obj) return } } - if cfgm.Name == nginxConfigMapsName { - glog.V(3).Infof("Removing ConfigMap: %v", cfgm.Name) + if configMap.Name == nginxConfigMapsName { + glog.V(3).Infof("Removing ConfigMap: %v", configMap.Name) lbc.syncQueue.enqueue(obj) } }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { - cfgm := cur.(*api_v1.ConfigMap) - if cfgm.Name == nginxConfigMapsName { - glog.V(3).Infof("ConfigMap %v changed, syncing", - cur.(*api_v1.ConfigMap).Name) + configMap := cur.(*api_v1.ConfigMap) + if configMap.Name == nginxConfigMapsName { + glog.V(3).Infof("ConfigMap %v changed, syncing", cur.(*api_v1.ConfigMap).Name) lbc.syncQueue.enqueue(cur) } } }, } - lbc.cfgmLister.Store, lbc.cfgmController = cache.NewInformer( - cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "configmaps", nginxConfigMapsNS, fields.Everything()), - &api_v1.ConfigMap{}, input.ResyncPeriod, cfgmHandlers) + + lbc.configMapLister.Store, lbc.configMapController = cache.NewInformer( + cache.NewListWatchFromClient( + lbc.client.Core().RESTClient(), + "configmaps", + nginxConfigMapsNS, + fields.Everything()), + &api_v1.ConfigMap{}, + input.ResyncPeriod, + configMapHandlers, + ) + } + } + + // statusUpdater requires ingLister to be instantiated, above. + lbc.statusUpdater = &StatusUpdater{ + client: input.KubeClient, + namespace: input.ControllerNamespace, + externalServiceName: input.ExternalServiceName, + ingLister: &lbc.ingressLister, + keyFunc: keyFunc, + } + + if input.ReportIngressStatus && input.IsLeaderElectionEnabled { + leaderCallbacks := leaderelection.LeaderCallbacks{ + OnStartedLeading: func(stop <-chan struct{}) { + glog.V(3).Info("started leading, updating ingress status") + ingresses, mergeableIngresses := lbc.getManagedIngresses() + err := lbc.statusUpdater.UpdateManagedAndMergeableIngresses(ingresses, mergeableIngresses) + if err != nil { + glog.V(3).Infof("error updating status when starting leading: %v", err) + } + }, + } + + var err error + lbc.leaderElector, err = NewLeaderElector(input.KubeClient, leaderCallbacks, input.ControllerNamespace) + if err != nil { + glog.V(3).Infof("Error starting LeaderElection: %v", err) } } @@ -395,10 +430,10 @@ func NewLoadBalancerController(input NewLoadBalancerControllerInput) *LoadBalanc } // hasChanges ignores Status or ResourceVersion changes -func hasChanges(oldIng *extensions.Ingress, curIng *extensions.Ingress) bool { - oldIng.Status.LoadBalancer.Ingress = curIng.Status.LoadBalancer.Ingress - oldIng.ResourceVersion = curIng.ResourceVersion - return !reflect.DeepEqual(oldIng, curIng) +func hasChanges(old *extensions.Ingress, current *extensions.Ingress) bool { + old.Status.LoadBalancer.Ingress = current.Status.LoadBalancer.Ingress + old.ResourceVersion = current.ResourceVersion + return !reflect.DeepEqual(old, current) } // Run starts the loadbalancer controller @@ -406,29 +441,29 @@ func (lbc *LoadBalancerController) Run() { if lbc.leaderElector != nil { go lbc.leaderElector.Run() } - go lbc.svcController.Run(lbc.stopCh) - go lbc.endpController.Run(lbc.stopCh) - go lbc.secrController.Run(lbc.stopCh) + go lbc.svcController.Run(lbc.stop) + go lbc.endpointController.Run(lbc.stop) + go lbc.secretController.Run(lbc.stop) if lbc.watchNginxConfigMaps { - go lbc.cfgmController.Run(lbc.stopCh) + go lbc.configMapController.Run(lbc.stop) } - go lbc.ingController.Run(lbc.stopCh) - go lbc.syncQueue.run(time.Second, lbc.stopCh) - <-lbc.stopCh + go lbc.ingressController.Run(lbc.stop) + go lbc.syncQueue.run(time.Second, lbc.stop) + <-lbc.stop } // Stop shutdowns the load balancer controller func (lbc *LoadBalancerController) Stop() { - close(lbc.stopCh) + close(lbc.stop) lbc.syncQueue.shutdown() } -func (lbc *LoadBalancerController) syncEndp(task Task) { +func (lbc *LoadBalancerController) syncEndpoint(task Task) { key := task.Key glog.V(3).Infof("Syncing endpoints %v", key) - obj, endpExists, err := lbc.endpLister.GetByKey(key) + obj, endpExists, err := lbc.endpointLister.GetByKey(key) if err != nil { lbc.syncQueue.requeue(task, err) return @@ -447,7 +482,7 @@ func (lbc *LoadBalancerController) syncEndp(task Task) { glog.Errorf("Ignoring Ingress %v(Minion): %v", ing.Name, err) continue } - if !lbc.cnf.HasIngress(master) { + if !lbc.configurator.HasIngress(master) { continue } mergeableIngresses, err := lbc.createMergableIngresses(master) @@ -457,13 +492,13 @@ func (lbc *LoadBalancerController) syncEndp(task Task) { } glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Namespace, ing.Name) - err = lbc.cnf.UpdateEndpointsMergeableIngress(mergeableIngresses) + err = lbc.configurator.UpdateEndpointsMergeableIngress(mergeableIngresses) if err != nil { glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err) } continue } - if !lbc.cnf.HasIngress(&ing) { + if !lbc.configurator.HasIngress(&ing) { continue } ingEx, err := lbc.createIngress(&ing) @@ -472,7 +507,7 @@ func (lbc *LoadBalancerController) syncEndp(task Task) { continue } glog.V(3).Infof("Updating Endpoints for %v/%v", ing.Namespace, ing.Name) - err = lbc.cnf.UpdateEndpoints(ingEx) + err = lbc.configurator.UpdateEndpoints(ingEx) if err != nil { glog.Errorf("Error updating endpoints for %v/%v: %v", ing.Namespace, ing.Name, err) } @@ -480,20 +515,20 @@ func (lbc *LoadBalancerController) syncEndp(task Task) { } } -func (lbc *LoadBalancerController) syncCfgm(task Task) { +func (lbc *LoadBalancerController) syncConfig(task Task) { key := task.Key glog.V(3).Infof("Syncing configmap %v", key) - obj, cfgmExists, err := lbc.cfgmLister.GetByKey(key) + obj, configExists, err := lbc.configMapLister.GetByKey(key) if err != nil { lbc.syncQueue.requeue(task, err) return } cfg := nginx.NewDefaultConfig() - if cfgmExists { + if configExists { cfgm := obj.(*api_v1.ConfigMap) - cfg = nginx.ParseConfigMap(cfgm, lbc.nginxPlus) + cfg = nginx.ParseConfigMap(cfgm, lbc.isNginxPlus) lbc.statusUpdater.SaveStatusFromExternalStatus(cfgm.Data["external-status-address"]) } @@ -508,38 +543,31 @@ func (lbc *LoadBalancerController) syncCfgm(task Task) { } } - if err := lbc.cnf.UpdateConfig(cfg, ingExes, mergeableIngresses); err != nil { - if cfgmExists { - cfgm := obj.(*api_v1.ConfigMap) - lbc.recorder.Eventf(cfgm, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration from %v was updated, but not applied: %v", key, err) - } - for _, ingEx := range ingExes { - lbc.recorder.Eventf(ingEx.Ingress, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration for %v/%v was updated, but not applied: %v", - ingEx.Ingress.Namespace, ingEx.Ingress.Name, err) - } - for _, mergeableIng := range mergeableIngresses { - master := mergeableIng.Master - lbc.recorder.Eventf(master.Ingress, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration for %v/%v(Master) was updated, but not applied: %v", - master.Ingress.Namespace, master.Ingress.Name, err) - for _, minion := range mergeableIng.Minions { - lbc.recorder.Eventf(minion.Ingress, api_v1.EventTypeWarning, "UpdatedWithError", "Configuration for %v/%v(Minion) was updated, but not applied: %v", - minion.Ingress.Namespace, minion.Ingress.Name, err) - } - } - } else { - if cfgmExists { - cfgm := obj.(*api_v1.ConfigMap) - lbc.recorder.Eventf(cfgm, api_v1.EventTypeNormal, "Updated", "Configuration from %v was updated", key) - } - for _, ingEx := range ingExes { - lbc.recorder.Eventf(ingEx.Ingress, api_v1.EventTypeNormal, "Updated", "Configuration for %v/%v was updated", ingEx.Ingress.Namespace, ingEx.Ingress.Name) - } - for _, mergeableIng := range mergeableIngresses { - master := mergeableIng.Master - lbc.recorder.Eventf(master.Ingress, api_v1.EventTypeWarning, "Updated", "Configuration for %v/%v(Master) was updated", master.Ingress.Namespace, master.Ingress.Name) - for _, minion := range mergeableIng.Minions { - lbc.recorder.Eventf(minion.Ingress, api_v1.EventTypeWarning, "Updated", "Configuration for %v/%v(Minion) was updated", minion.Ingress.Namespace, minion.Ingress.Name) - } + updateErr := lbc.configurator.UpdateConfig(cfg, ingExes, mergeableIngresses) + + eventTitle := "Updated" + eventType := api_v1.EventTypeNormal + eventWarningMessage := "" + + if updateErr != nil { + eventTitle = "UpdatedWithError" + eventType = api_v1.EventTypeWarning + eventWarningMessage = "but was not applied" + } + if configExists { + cfgm := obj.(*api_v1.ConfigMap) + lbc.recorder.Eventf(cfgm, eventType, eventTitle, "Configuration from %v was updated %s: %v", key, eventWarningMessage, err) + } + for _, ingEx := range ingExes { + lbc.recorder.Eventf(ingEx.Ingress, eventType, eventTitle, "Configuration for %v/%v was updated %s: %v", + ingEx.Ingress.Namespace, ingEx.Ingress.Name, eventWarningMessage, err) + } + for _, mergeableIng := range mergeableIngresses { + master := mergeableIng.Master + lbc.recorder.Eventf(master.Ingress, eventType, eventTitle, "Configuration for %v/%v(Master) was updated %s: %v", master.Ingress.Namespace, master.Ingress.Name, eventWarningMessage, err) + for _, minion := range mergeableIng.Minions { + lbc.recorder.Eventf(minion.Ingress, eventType, eventTitle, "Configuration for %v/%v(Minion) was updated %s: %v", + minion.Ingress.Namespace, minion.Ingress.Name, eventWarningMessage, err) } } } @@ -548,7 +576,7 @@ func (lbc *LoadBalancerController) syncCfgm(task Task) { func (lbc *LoadBalancerController) getManagedIngresses() ([]extensions.Ingress, map[string]*nginx.MergeableIngresses) { mergeableIngresses := make(map[string]*nginx.MergeableIngresses) var managedIngresses []extensions.Ingress - ings, _ := lbc.ingLister.List() + ings, _ := lbc.ingressLister.List() for i := range ings.Items { ing := ings.Items[i] if !lbc.isNginxIngress(&ing) { @@ -560,7 +588,7 @@ func (lbc *LoadBalancerController) getManagedIngresses() ([]extensions.Ingress, glog.Errorf("Ignoring Ingress %v(Minion): %v", ing, err) continue } - if !lbc.cnf.HasIngress(master) { + if !lbc.configurator.HasIngress(master) { continue } if _, exists := mergeableIngresses[master.Name]; !exists { @@ -573,7 +601,7 @@ func (lbc *LoadBalancerController) getManagedIngresses() ([]extensions.Ingress, } continue } - if !lbc.cnf.HasIngress(&ing) { + if !lbc.configurator.HasIngress(&ing) { continue } managedIngresses = append(managedIngresses, ing) @@ -602,10 +630,10 @@ func (lbc *LoadBalancerController) sync(task Task) { case IngressMinion: lbc.syncIngMinion(task) case ConfigMap: - lbc.syncCfgm(task) + lbc.syncConfig(task) return case Endpoints: - lbc.syncEndp(task) + lbc.syncEndpoint(task) return case Secret: lbc.syncSecret(task) @@ -617,7 +645,7 @@ func (lbc *LoadBalancerController) sync(task Task) { func (lbc *LoadBalancerController) syncIngMinion(task Task) { key := task.Key - obj, ingExists, err := lbc.ingLister.Store.GetByKey(key) + obj, ingExists, err := lbc.ingressLister.Store.GetByKey(key) if err != nil { lbc.syncQueue.requeue(task, err) return @@ -640,7 +668,7 @@ func (lbc *LoadBalancerController) syncIngMinion(task Task) { _, err = lbc.createIngress(minion) if err != nil { lbc.syncQueue.requeueAfter(task, err, 5*time.Second) - if !lbc.cnf.HasMinion(master, minion) { + if !lbc.configurator.HasMinion(master, minion) { return } } @@ -650,7 +678,7 @@ func (lbc *LoadBalancerController) syncIngMinion(task Task) { func (lbc *LoadBalancerController) syncIng(task Task) { key := task.Key - ing, ingExists, err := lbc.ingLister.GetByKeySafe(key) + ing, ingExists, err := lbc.ingressLister.GetByKeySafe(key) if err != nil { lbc.syncQueue.requeue(task, err) return @@ -659,7 +687,7 @@ func (lbc *LoadBalancerController) syncIng(task Task) { if !ingExists { glog.V(2).Infof("Deleting Ingress: %v\n", key) - err := lbc.cnf.DeleteIngress(key) + err := lbc.configurator.DeleteIngress(key) if err != nil { glog.Errorf("Error when deleting configuration for %v: %v", key, err) } @@ -679,22 +707,27 @@ func (lbc *LoadBalancerController) syncIng(task Task) { } return } - err = lbc.cnf.AddOrUpdateMergeableIngress(mergeableIngExs) - if err != nil { - lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "AddedOrUpdatedWithError", "Configuration for %v(Master) was added or updated, but not applied: %v", key, err) - for _, minion := range mergeableIngExs.Minions { - lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "AddedOrUpdatedWithError", "Configuration for %v/%v(Minion) was added or updated, but not applied: %v", minion.Ingress.Namespace, minion.Ingress.Name, err) - } - } else { - lbc.recorder.Eventf(ing, api_v1.EventTypeNormal, "AddedOrUpdated", "Configuration for %v(Master) was added or updated", key) - for _, minion := range mergeableIngExs.Minions { - lbc.recorder.Eventf(ing, api_v1.EventTypeNormal, "AddedOrUpdated", "Configuration for %v/%v(Minion) was added or updated", minion.Ingress.Namespace, minion.Ingress.Name) - } + addErr := lbc.configurator.AddOrUpdateMergeableIngress(mergeableIngExs) + + // record correct eventType and message depending on the error + eventTitle := "AddedOrUpdated" + eventType := api_v1.EventTypeNormal + eventWarningMessage := "" + + if addErr != nil { + eventTitle = "AddedOrUpdatedWithError" + eventType = api_v1.EventTypeWarning + eventWarningMessage = "but was not applied" + } + lbc.recorder.Eventf(ing, eventType, eventTitle, "Configuration for %v(Master) was added or updated %s: %v", key, eventWarningMessage, err) + for _, minion := range mergeableIngExs.Minions { + lbc.recorder.Eventf(ing, eventType, eventTitle, "Configuration for %v/%v(Minion) was added or updated %s: %v", minion.Ingress.Namespace, minion.Ingress.Name, eventWarningMessage, err) } + if lbc.reportStatusEnabled() { err = lbc.statusUpdater.UpdateMergableIngresses(mergeableIngExs) if err != nil { - glog.V(3).Infof("error updating ing status: %v", err) + glog.V(3).Infof("error updating ingress status: %v", err) } } return @@ -712,7 +745,7 @@ func (lbc *LoadBalancerController) syncIng(task Task) { return } - err = lbc.cnf.AddOrUpdateIngress(ingEx) + err = lbc.configurator.AddOrUpdateIngress(ingEx) if err != nil { lbc.recorder.Eventf(ing, api_v1.EventTypeWarning, "AddedOrUpdatedWithError", "Configuration for %v was added or updated, but not applied: %v", key, err) } else { @@ -760,7 +793,7 @@ func (lbc *LoadBalancerController) isExternalServiceForStatus(svc *api_v1.Servic // reportStatusEnabled determines if we should attempt to report status func (lbc *LoadBalancerController) reportStatusEnabled() bool { if lbc.reportIngressStatus { - if lbc.leaderElectionEnabled { + if lbc.isLeaderElectionEnabled { return lbc.leaderElector != nil && lbc.leaderElector.IsLeader() } return true @@ -770,7 +803,7 @@ func (lbc *LoadBalancerController) reportStatusEnabled() bool { func (lbc *LoadBalancerController) syncSecret(task Task) { key := task.Key - obj, secrExists, err := lbc.secrLister.Store.GetByKey(key) + obj, secrExists, err := lbc.secretLister.Store.GetByKey(key) if err != nil { lbc.syncQueue.requeue(task, err) return @@ -804,7 +837,7 @@ func (lbc *LoadBalancerController) syncSecret(task Task) { glog.Errorf("Ignoring Ingress %v(Minion): %v", minion.Name, err) continue } - err = lbc.cnf.AddOrUpdateMergeableIngress(mergeableIngress) + err = lbc.configurator.AddOrUpdateMergeableIngress(mergeableIngress) if err != nil { glog.Errorf("Failed to update Ingress %v(Master) of %v(Minion): %v", master.Name, minion.Name, err) } @@ -813,7 +846,7 @@ func (lbc *LoadBalancerController) syncSecret(task Task) { lbc.syncQueue.enqueue(&minion) } - if err := lbc.cnf.DeleteSecret(key, nonMinions); err != nil { + if err := lbc.configurator.DeleteSecret(key, nonMinions); err != nil { glog.Errorf("Error when deleting Secret: %v: %v", key, err) } @@ -836,7 +869,7 @@ func (lbc *LoadBalancerController) syncSecret(task Task) { glog.Errorf("Couldn't validate the default server Secret %v: %v", key, err) lbc.recorder.Eventf(secret, api_v1.EventTypeWarning, "Rejected", "the default server Secret %v was rejected, using the previous version: %v", key, err) } else { - err := lbc.cnf.AddOrUpdateDefaultServerTLSSecret(secret) + err := lbc.configurator.AddOrUpdateDefaultServerTLSSecret(secret) if err != nil { glog.Errorf("Error when updating the default server Secret %v: %v", key, err) lbc.recorder.Eventf(secret, api_v1.EventTypeWarning, "UpdatedWithError", "the default server Secret %v was updated, but not applied: %v", key, err) @@ -864,7 +897,7 @@ func (lbc *LoadBalancerController) syncSecret(task Task) { glog.Errorf("Ignoring Ingress %v(Minion): %v", minion.Name, err) continue } - err = lbc.cnf.AddOrUpdateMergeableIngress(mergeableIngress) + err = lbc.configurator.AddOrUpdateMergeableIngress(mergeableIngress) if err != nil { glog.Errorf("Failed to update Ingress %v(Master) of %v(Minion): %v", master.Name, minion.Name, err) } @@ -873,7 +906,7 @@ func (lbc *LoadBalancerController) syncSecret(task Task) { lbc.syncQueue.enqueue(&minion) } - if err := lbc.cnf.DeleteSecret(key, nonMinions); err != nil { + if err := lbc.configurator.DeleteSecret(key, nonMinions); err != nil { glog.Errorf("Error when deleting Secret: %v: %v", key, err) } for _, ing := range nonMinions { @@ -884,7 +917,7 @@ func (lbc *LoadBalancerController) syncSecret(task Task) { return } - if err := lbc.cnf.AddOrUpdateSecret(secret); err != nil { + if err := lbc.configurator.AddOrUpdateSecret(secret); err != nil { glog.Errorf("Error when updating Secret %v: %v", key, err) lbc.recorder.Eventf(secret, api_v1.EventTypeWarning, "UpdatedWithError", "%v was updated, but not applied: %v", key, err) for _, ing := range nonMinions { @@ -919,7 +952,7 @@ func (lbc *LoadBalancerController) syncSecret(task Task) { } func (lbc *LoadBalancerController) findIngressesForSecret(secretNamespace string, secretName string) (nonMinions []extensions.Ingress, minions []extensions.Ingress, error error) { - ings, err := lbc.ingLister.List() + ings, err := lbc.ingressLister.List() if err != nil { return nil, nil, fmt.Errorf("Couldn't get the list of Ingress resources: %v", err) } @@ -935,7 +968,7 @@ items: } if !isMinion(&ing) { - if !lbc.cnf.HasIngress(&ing) { + if !lbc.configurator.HasIngress(&ing) { continue } for _, tls := range ing.Spec.TLS { @@ -944,7 +977,7 @@ items: continue items } } - if lbc.nginxPlus { + if lbc.isNginxPlus { if jwtKey, exists := ing.Annotations[nginx.JWTKeyAnnotation]; exists { if jwtKey == secretName { nonMinions = append(nonMinions, ing) @@ -956,14 +989,14 @@ items: // we're dealing with a minion // minions can only have JWT secrets - if lbc.nginxPlus { + if lbc.isNginxPlus { master, err := lbc.findMasterForMinion(&ing) if err != nil { glog.Infof("Ignoring Ingress %v(Minion): %v", ing.Name, err) continue } - if !lbc.cnf.HasMinion(master, &ing) { + if !lbc.configurator.HasMinion(master, &ing) { continue } @@ -992,7 +1025,7 @@ func (lbc *LoadBalancerController) enqueueIngressForService(svc *api_v1.Service) } ing = *master } - if !lbc.cnf.HasIngress(&ing) { + if !lbc.configurator.HasIngress(&ing) { continue } lbc.syncQueue.enqueue(&ing) @@ -1001,7 +1034,7 @@ func (lbc *LoadBalancerController) enqueueIngressForService(svc *api_v1.Service) } func (lbc *LoadBalancerController) getIngressesForService(svc *api_v1.Service) []extensions.Ingress { - ings, err := lbc.ingLister.GetServiceIngress(svc) + ings, err := lbc.ingressLister.GetServiceIngress(svc) if err != nil { glog.V(3).Infof("ignoring service %v: %v", svc.Name, err) return nil @@ -1043,7 +1076,7 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin ingEx.TLSSecrets[secretName] = secret } - if lbc.nginxPlus { + if lbc.isNginxPlus { if jwtKey, exists := ingEx.Ingress.Annotations[nginx.JWTKeyAnnotation]; exists { secretName := jwtKey @@ -1072,7 +1105,7 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin } else { ingEx.Endpoints[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = endps } - if lbc.nginxPlus && lbc.isHealthCheckEnabled(ing) { + if lbc.isNginxPlus && lbc.isHealthCheckEnabled(ing) { healthCheck := lbc.getHealthChecksForIngressBackend(ing.Spec.Backend, ing.Namespace) if healthCheck != nil { ingEx.HealthChecks[ing.Spec.Backend.ServiceName+ing.Spec.Backend.ServicePort.String()] = healthCheck @@ -1098,7 +1131,7 @@ func (lbc *LoadBalancerController) createIngress(ing *extensions.Ingress) (*ngin } else { ingEx.Endpoints[path.Backend.ServiceName+path.Backend.ServicePort.String()] = endps } - if lbc.nginxPlus && lbc.isHealthCheckEnabled(ing) { + if lbc.isNginxPlus && lbc.isHealthCheckEnabled(ing) { // Pull active health checks from k8 api healthCheck := lbc.getHealthChecksForIngressBackend(&path.Backend, ing.Namespace) if healthCheck != nil { @@ -1180,7 +1213,7 @@ func (lbc *LoadBalancerController) getEndpointsForIngressBackend(backend *extens return nil, err } - endps, err := lbc.endpLister.GetServiceEndpoints(svc) + endps, err := lbc.endpointLister.GetServiceEndpoints(svc) if err != nil { glog.V(3).Infof("Error getting endpoints for service %s from the cache: %v", svc.Name, err) return nil, err @@ -1319,7 +1352,7 @@ func (lbc *LoadBalancerController) isHealthCheckEnabled(ing *extensions.Ingress) // For NGINX Plus, it also checks if the secret follows the JWK Secret format. func (lbc *LoadBalancerController) ValidateSecret(secret *api_v1.Secret) error { err1 := nginx.ValidateTLSSecret(secret) - if !lbc.nginxPlus { + if !lbc.isNginxPlus { return err1 } @@ -1334,7 +1367,7 @@ func (lbc *LoadBalancerController) ValidateSecret(secret *api_v1.Secret) error { // getMinionsForHost returns a list of all minion ingress resources for a given master func (lbc *LoadBalancerController) getMinionsForMaster(master *nginx.IngressEx) ([]*nginx.IngressEx, error) { - ings, err := lbc.ingLister.List() + ings, err := lbc.ingressLister.List() if err != nil { return []*nginx.IngressEx{}, err } @@ -1347,7 +1380,7 @@ func (lbc *LoadBalancerController) getMinionsForMaster(master *nginx.IngressEx) var minions []*nginx.IngressEx var minionPaths = make(map[string]*extensions.Ingress) - for i, _ := range ings.Items { + for i := range ings.Items { if !lbc.isNginxIngress(&ings.Items[i]) { continue } @@ -1396,16 +1429,16 @@ func (lbc *LoadBalancerController) getMinionsForMaster(master *nginx.IngressEx) // findMasterForHost returns a master for a given minion func (lbc *LoadBalancerController) findMasterForMinion(minion *extensions.Ingress) (*extensions.Ingress, error) { - ings, err := lbc.ingLister.List() + ings, err := lbc.ingressLister.List() if err != nil { return &extensions.Ingress{}, err } - for i, _ := range ings.Items { + for i := range ings.Items { if !lbc.isNginxIngress(&ings.Items[i]) { continue } - if !lbc.cnf.HasIngress(&ings.Items[i]) { + if !lbc.configurator.HasIngress(&ings.Items[i]) { continue } if !isMaster(&ings.Items[i]) { diff --git a/internal/controller/controller_test.go b/internal/controller/controller_test.go index 64f2697e9f..665c5db487 100644 --- a/internal/controller/controller_test.go +++ b/internal/controller/controller_test.go @@ -159,9 +159,9 @@ func TestIsNginxIngress(t *testing.T) { func TestCreateMergableIngresses(t *testing.T) { cafeMaster, coffeeMinion, teaMinion, lbc := getMergableDefaults() - lbc.ingLister.Add(&cafeMaster) - lbc.ingLister.Add(&coffeeMinion) - lbc.ingLister.Add(&teaMinion) + lbc.ingressLister.Add(&cafeMaster) + lbc.ingressLister.Add(&coffeeMinion) + lbc.ingressLister.Add(&teaMinion) mergeableIngresses, err := lbc.createMergableIngresses(&cafeMaster) if err != nil { @@ -220,7 +220,7 @@ func TestCreateMergableIngressesInvalidMaster(t *testing.T) { }, }, } - lbc.ingLister.Add(&cafeMaster) + lbc.ingressLister.Add(&cafeMaster) expected := fmt.Errorf("Ingress Resource %v/%v with the 'nginx.org/mergeable-ingress-type' annotation set to 'master' cannot contain Paths", cafeMaster.Namespace, cafeMaster.Name) _, err := lbc.createMergableIngresses(&cafeMaster) @@ -237,9 +237,9 @@ func TestFindMasterForMinion(t *testing.T) { Paths: []extensions.HTTPIngressPath{}, } - lbc.ingLister.Add(&cafeMaster) - lbc.ingLister.Add(&coffeeMinion) - lbc.ingLister.Add(&teaMinion) + lbc.ingressLister.Add(&cafeMaster) + lbc.ingressLister.Add(&coffeeMinion) + lbc.ingressLister.Add(&teaMinion) master, err := lbc.findMasterForMinion(&coffeeMinion) if err != nil { @@ -261,8 +261,8 @@ func TestFindMasterForMinion(t *testing.T) { func TestFindMasterForMinionNoMaster(t *testing.T) { _, coffeeMinion, teaMinion, lbc := getMergableDefaults() - lbc.ingLister.Add(&coffeeMinion) - lbc.ingLister.Add(&teaMinion) + lbc.ingressLister.Add(&coffeeMinion) + lbc.ingressLister.Add(&teaMinion) expected := fmt.Errorf("Could not find a Master for Minion: '%v/%v'", coffeeMinion.Namespace, coffeeMinion.Name) _, err := lbc.findMasterForMinion(&coffeeMinion) @@ -291,8 +291,8 @@ func TestFindMasterForMinionInvalidMinion(t *testing.T) { }, } - lbc.ingLister.Add(&cafeMaster) - lbc.ingLister.Add(&coffeeMinion) + lbc.ingressLister.Add(&cafeMaster) + lbc.ingressLister.Add(&coffeeMinion) master, err := lbc.findMasterForMinion(&coffeeMinion) if err != nil { @@ -311,9 +311,9 @@ func TestGetMinionsForMaster(t *testing.T) { Paths: []extensions.HTTPIngressPath{}, } - lbc.ingLister.Add(&cafeMaster) - lbc.ingLister.Add(&coffeeMinion) - lbc.ingLister.Add(&teaMinion) + lbc.ingressLister.Add(&cafeMaster) + lbc.ingressLister.Add(&coffeeMinion) + lbc.ingressLister.Add(&teaMinion) cafeMasterIngEx, err := lbc.createIngress(&cafeMaster) if err != nil { @@ -364,9 +364,9 @@ func TestGetMinionsForMasterInvalidMinion(t *testing.T) { }, } - lbc.ingLister.Add(&cafeMaster) - lbc.ingLister.Add(&coffeeMinion) - lbc.ingLister.Add(&teaMinion) + lbc.ingressLister.Add(&cafeMaster) + lbc.ingressLister.Add(&coffeeMinion) + lbc.ingressLister.Add(&teaMinion) cafeMasterIngEx, err := lbc.createIngress(&cafeMaster) if err != nil { @@ -421,9 +421,9 @@ func TestGetMinionsForMasterConflictingPaths(t *testing.T) { }, }) - lbc.ingLister.Add(&cafeMaster) - lbc.ingLister.Add(&coffeeMinion) - lbc.ingLister.Add(&teaMinion) + lbc.ingressLister.Add(&cafeMaster) + lbc.ingressLister.Add(&coffeeMinion) + lbc.ingressLister.Add(&teaMinion) cafeMasterIngEx, err := lbc.createIngress(&cafeMaster) if err != nil { @@ -564,12 +564,12 @@ func getMergableDefaults() (cafeMaster, coffeeMinion, teaMinion extensions.Ingre lbc = LoadBalancerController{ client: fakeClient, ingressClass: "nginx", - cnf: cnf, + configurator: cnf, } lbc.svcLister, _ = cache.NewInformer( cache.NewListWatchFromClient(lbc.client.ExtensionsV1beta1().RESTClient(), "services", "default", fields.Everything()), &extensions.Ingress{}, time.Duration(1), nil) - lbc.ingLister.Store, _ = cache.NewInformer( + lbc.ingressLister.Store, _ = cache.NewInformer( cache.NewListWatchFromClient(lbc.client.ExtensionsV1beta1().RESTClient(), "ingresses", "default", fields.Everything()), &extensions.Ingress{}, time.Duration(1), nil) coffeeService := v1.Service{ @@ -786,7 +786,7 @@ func TestGetServicePortForIngressPort(t *testing.T) { lbc := LoadBalancerController{ client: fakeClient, ingressClass: "nginx", - cnf: cnf, + configurator: cnf, } svc := v1.Service{ TypeMeta: meta_v1.TypeMeta{}, @@ -940,15 +940,15 @@ func TestFindIngressesForSecret(t *testing.T) { lbc := LoadBalancerController{ client: fakeClient, ingressClass: "nginx", - cnf: cnf, - nginxPlus: true, + configurator: cnf, + isNginxPlus: true, } - lbc.ingLister.Store, _ = cache.NewInformer( + lbc.ingressLister.Store, _ = cache.NewInformer( cache.NewListWatchFromClient(lbc.client.ExtensionsV1beta1().RESTClient(), "ingresses", "default", fields.Everything()), &extensions.Ingress{}, time.Duration(1), nil) - lbc.secrLister.Store, lbc.secrController = cache.NewInformer( + lbc.secretLister.Store, lbc.secretController = cache.NewInformer( cache.NewListWatchFromClient(lbc.client.Core().RESTClient(), "secrets", "default", fields.Everything()), &v1.Secret{}, time.Duration(1), nil) @@ -964,8 +964,8 @@ func TestFindIngressesForSecret(t *testing.T) { t.Fatalf("Ingress was not added: %v", err) } - lbc.ingLister.Add(&test.ingress) - lbc.secrLister.Add(&test.secret) + lbc.ingressLister.Add(&test.ingress) + lbc.secretLister.Add(&test.secret) nonMinions, minions, err := lbc.findIngressesForSecret(test.secret.ObjectMeta.Namespace, test.secret.ObjectMeta.Name) if err != nil { diff --git a/internal/controller/utils.go b/internal/controller/utils.go index 90ad817190..33f11ed359 100644 --- a/internal/controller/utils.go +++ b/internal/controller/utils.go @@ -31,9 +31,9 @@ import ( "github.com/golang/glog" ) -// taskQueue manages a work queue through an independent worker that +// TaskQueue manages a work queue through an independent worker that // invokes the given sync function for every work item inserted. -type taskQueue struct { +type TaskQueue struct { // queue is the work queue the worker polls queue *workqueue.Type // sync is called for each item in the queue @@ -42,12 +42,12 @@ type taskQueue struct { workerDone chan struct{} } -func (t *taskQueue) run(period time.Duration, stopCh <-chan struct{}) { +func (t *TaskQueue) run(period time.Duration, stopCh <-chan struct{}) { wait.Until(t.worker, period, stopCh) } // enqueue enqueues ns/name of the given api object in the task queue. -func (t *taskQueue) enqueue(obj interface{}) { +func (t *TaskQueue) enqueue(obj interface{}) { key, err := keyFunc(obj) if err != nil { glog.V(3).Infof("Couldn't get key for object %v: %v", obj, err) @@ -65,12 +65,12 @@ func (t *taskQueue) enqueue(obj interface{}) { t.queue.Add(task) } -func (t *taskQueue) requeue(task Task, err error) { +func (t *TaskQueue) requeue(task Task, err error) { glog.Errorf("Requeuing %v, err %v", task.Key, err) t.queue.Add(task) } -func (t *taskQueue) requeueAfter(task Task, err error, after time.Duration) { +func (t *TaskQueue) requeueAfter(task Task, err error, after time.Duration) { glog.Errorf("Requeuing %v after %s, err %v", task.Key, after.String(), err) go func(task Task, after time.Duration) { time.Sleep(after) @@ -79,7 +79,7 @@ func (t *taskQueue) requeueAfter(task Task, err error, after time.Duration) { } // worker processes work in the queue through sync. -func (t *taskQueue) worker() { +func (t *TaskQueue) worker() { for { task, quit := t.queue.Get() if quit { @@ -93,15 +93,15 @@ func (t *taskQueue) worker() { } // shutdown shuts down the work queue and waits for the worker to ACK -func (t *taskQueue) shutdown() { +func (t *TaskQueue) shutdown() { t.queue.ShutDown() <-t.workerDone } // NewTaskQueue creates a new task queue with the given sync function. // The sync function is called for every element inserted into the queue. -func NewTaskQueue(syncFn func(Task)) *taskQueue { - return &taskQueue{ +func NewTaskQueue(syncFn func(Task)) *TaskQueue { + return &TaskQueue{ queue: workqueue.New(), sync: syncFn, workerDone: make(chan struct{}), @@ -126,7 +126,7 @@ const ( Service ) -// Task is an element of a taskQueue +// Task is an element of a TaskQueue type Task struct { Kind Kind Key string