diff --git a/internal/ingress/controller/store/store.go b/internal/ingress/controller/store/store.go index facf079fa5..9d57a9e2be 100644 --- a/internal/ingress/controller/store/store.go +++ b/internal/ingress/controller/store/store.go @@ -29,15 +29,15 @@ import ( apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" - "k8s.io/apimachinery/pkg/fields" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" - cache_client "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/ingress-nginx/internal/file" @@ -120,6 +120,15 @@ type Event struct { Obj interface{} } +// Informer defines the required SharedIndexInformers that interact with the API server. +type Informer struct { + Ingress cache.SharedIndexInformer + Endpoint cache.SharedIndexInformer + Service cache.SharedIndexInformer + Secret cache.SharedIndexInformer + ConfigMap cache.SharedIndexInformer +} + // Lister returns the stores for ingresses, services, endpoints, secrets and configmaps. type Lister struct { Ingress IngressLister @@ -130,38 +139,34 @@ type Lister struct { IngressAnnotation IngressAnnotationsLister } -// Controller defines the required controllers that interact against the api server -type Controller struct { - Ingress cache.Controller - Endpoint cache.Controller - Service cache.Controller - Secret cache.Controller - Configmap cache.Controller -} +// Run initiates the synchronization of the informers against the API server. +func (i *Informer) Run(stopCh chan struct{}) { + go i.Endpoint.Run(stopCh) + go i.Service.Run(stopCh) + go i.Secret.Run(stopCh) + go i.ConfigMap.Run(stopCh) -// Run initiates the synchronization of the controllers against the api server -func (c *Controller) Run(stopCh chan struct{}) { - go c.Endpoint.Run(stopCh) - go c.Service.Run(stopCh) - go c.Secret.Run(stopCh) - go c.Configmap.Run(stopCh) - - // Wait for all involved caches to be synced, before processing items from the queue is started + // wait for all involved caches to be synced before processing items + // from the queue if !cache.WaitForCacheSync(stopCh, - c.Endpoint.HasSynced, - c.Service.HasSynced, - c.Secret.HasSynced, - c.Configmap.HasSynced, + i.Endpoint.HasSynced, + i.Service.HasSynced, + i.Secret.HasSynced, + i.ConfigMap.HasSynced, ) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) } - // We need to wait before start syncing the ingress rules - // because the rules requires content from other listers + // in big clusters, deltas can keep arriving even after HasSynced + // functions have returned 'true' time.Sleep(1 * time.Second) - go c.Ingress.Run(stopCh) + + // we can start syncing ingress objects only after other caches are + // ready, because ingress rules require content from other listers, and + // 'add' events get triggered in the handlers during caches population. + go i.Ingress.Run(stopCh) if !cache.WaitForCacheSync(stopCh, - c.Ingress.HasSynced, + i.Ingress.HasSynced, ) { runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) } @@ -176,10 +181,10 @@ type k8sStore struct { // operation to execute in each OnUpdate invocation backendConfig ngx_config.Configuration - // cache contains the cache Controllers - cache *Controller + // informer contains the cache Informers + informers *Informer - // listers contains the cache.Store used in the ingress controller + // listers contains the cache.Store interfaces used in the ingress controller listers *Lister // sslStore local store of SSL certificates (certificates used in ingress) @@ -214,7 +219,7 @@ func New(checkOCSP bool, store := &k8sStore{ isOCSPCheckEnabled: checkOCSP, - cache: &Controller{}, + informers: &Informer{}, listers: &Lister{}, sslStore: NewSSLCertTracker(), filesystem: fs, @@ -237,6 +242,26 @@ func New(checkOCSP bool, // k8sStore fulfils resolver.Resolver interface store.annotations = annotations.NewAnnotationExtractor(store) + store.listers.IngressAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc) + + // create informers factory, enable and assign required informers + infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {}) + + store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer() + store.listers.Ingress.Store = store.informers.Ingress.GetStore() + + store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer() + store.listers.Endpoint.Store = store.informers.Endpoint.GetStore() + + store.informers.Secret = infFactory.Core().V1().Secrets().Informer() + store.listers.Secret.Store = store.informers.Secret.GetStore() + + store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer() + store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore() + + store.informers.Service = infFactory.Core().V1().Services().Informer() + store.listers.Service.Store = store.informers.Service.GetStore() + ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addIng := obj.(*extensions.Ingress) @@ -372,7 +397,7 @@ func New(checkOCSP bool, }, } - eventHandler := cache.ResourceEventHandlerFuncs{ + epEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { updateCh.In() <- Event{ Type: CreateEvent, @@ -434,27 +459,11 @@ func New(checkOCSP bool, }, } - store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc) - - store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer( - cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()), - &extensions.Ingress{}, resyncPeriod, ingEventHandler) - - store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer( - cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()), - &apiv1.Endpoints{}, resyncPeriod, eventHandler) - - store.listers.Secret.Store, store.cache.Secret = cache.NewInformer( - cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()), - &apiv1.Secret{}, resyncPeriod, secrEventHandler) - - store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer( - cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()), - &apiv1.ConfigMap{}, resyncPeriod, mapEventHandler) - - store.listers.Service.Store, store.cache.Service = cache.NewInformer( - cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()), - &apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{}) + store.informers.Ingress.AddEventHandler(ingEventHandler) + store.informers.Endpoint.AddEventHandler(epEventHandler) + store.informers.Secret.AddEventHandler(secrEventHandler) + store.informers.ConfigMap.AddEventHandler(mapEventHandler) + store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{}) return store } @@ -611,11 +620,11 @@ func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) { } } -// Run initiates the synchronization of the controllers -// and the initial synchronization of the secrets. +// Run initiates the synchronization of the informers and the initial +// synchronization of the secrets. func (s k8sStore) Run(stopCh chan struct{}) { - // start controllers - s.cache.Run(stopCh) + // start informers + s.informers.Run(stopCh) // initial sync of secrets to avoid unnecessary reloads glog.Info("running initial sync of secrets")