Skip to content

Commit

Permalink
Use SharedIndexInformers in place of Informers (#2271)
Browse files Browse the repository at this point in the history
  • Loading branch information
antoineco authored and aledbf committed Mar 29, 2018
1 parent 5738ddb commit b09ecf7
Showing 1 changed file with 65 additions and 56 deletions.
121 changes: 65 additions & 56 deletions internal/ingress/controller/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ import (

apiv1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
"k8s.io/apimachinery/pkg/fields"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
cache_client "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"

"k8s.io/ingress-nginx/internal/file"
Expand Down Expand Up @@ -120,6 +120,15 @@ type Event struct {
Obj interface{}
}

// Informer defines the required SharedIndexInformers that interact with the API server.
type Informer struct {
Ingress cache.SharedIndexInformer
Endpoint cache.SharedIndexInformer
Service cache.SharedIndexInformer
Secret cache.SharedIndexInformer
ConfigMap cache.SharedIndexInformer
}

// Lister returns the stores for ingresses, services, endpoints, secrets and configmaps.
type Lister struct {
Ingress IngressLister
Expand All @@ -130,38 +139,34 @@ type Lister struct {
IngressAnnotation IngressAnnotationsLister
}

// Controller defines the required controllers that interact against the api server
type Controller struct {
Ingress cache.Controller
Endpoint cache.Controller
Service cache.Controller
Secret cache.Controller
Configmap cache.Controller
}
// Run initiates the synchronization of the informers against the API server.
func (i *Informer) Run(stopCh chan struct{}) {
go i.Endpoint.Run(stopCh)
go i.Service.Run(stopCh)
go i.Secret.Run(stopCh)
go i.ConfigMap.Run(stopCh)

// Run initiates the synchronization of the controllers against the api server
func (c *Controller) Run(stopCh chan struct{}) {
go c.Endpoint.Run(stopCh)
go c.Service.Run(stopCh)
go c.Secret.Run(stopCh)
go c.Configmap.Run(stopCh)

// Wait for all involved caches to be synced, before processing items from the queue is started
// wait for all involved caches to be synced before processing items
// from the queue
if !cache.WaitForCacheSync(stopCh,
c.Endpoint.HasSynced,
c.Service.HasSynced,
c.Secret.HasSynced,
c.Configmap.HasSynced,
i.Endpoint.HasSynced,
i.Service.HasSynced,
i.Secret.HasSynced,
i.ConfigMap.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}

// We need to wait before start syncing the ingress rules
// because the rules requires content from other listers
// in big clusters, deltas can keep arriving even after HasSynced
// functions have returned 'true'
time.Sleep(1 * time.Second)
go c.Ingress.Run(stopCh)

// we can start syncing ingress objects only after other caches are
// ready, because ingress rules require content from other listers, and
// 'add' events get triggered in the handlers during caches population.
go i.Ingress.Run(stopCh)
if !cache.WaitForCacheSync(stopCh,
c.Ingress.HasSynced,
i.Ingress.HasSynced,
) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
}
Expand All @@ -176,10 +181,10 @@ type k8sStore struct {
// operation to execute in each OnUpdate invocation
backendConfig ngx_config.Configuration

// cache contains the cache Controllers
cache *Controller
// informer contains the cache Informers
informers *Informer

// listers contains the cache.Store used in the ingress controller
// listers contains the cache.Store interfaces used in the ingress controller
listers *Lister

// sslStore local store of SSL certificates (certificates used in ingress)
Expand Down Expand Up @@ -214,7 +219,7 @@ func New(checkOCSP bool,

store := &k8sStore{
isOCSPCheckEnabled: checkOCSP,
cache: &Controller{},
informers: &Informer{},
listers: &Lister{},
sslStore: NewSSLCertTracker(),
filesystem: fs,
Expand All @@ -237,6 +242,26 @@ func New(checkOCSP bool,
// k8sStore fulfils resolver.Resolver interface
store.annotations = annotations.NewAnnotationExtractor(store)

store.listers.IngressAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)

// create informers factory, enable and assign required informers
infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})

store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()

store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()

store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()

store.informers.ConfigMap = infFactory.Core().V1().ConfigMaps().Informer()
store.listers.ConfigMap.Store = store.informers.ConfigMap.GetStore()

store.informers.Service = infFactory.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()

ingEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
Expand Down Expand Up @@ -372,7 +397,7 @@ func New(checkOCSP bool,
},
}

eventHandler := cache.ResourceEventHandlerFuncs{
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
updateCh.In() <- Event{
Type: CreateEvent,
Expand Down Expand Up @@ -434,27 +459,11 @@ func New(checkOCSP bool,
},
}

store.listers.IngressAnnotation.Store = cache_client.NewStore(cache_client.DeletionHandlingMetaNamespaceKeyFunc)

store.listers.Ingress.Store, store.cache.Ingress = cache.NewInformer(
cache.NewListWatchFromClient(client.ExtensionsV1beta1().RESTClient(), "ingresses", namespace, fields.Everything()),
&extensions.Ingress{}, resyncPeriod, ingEventHandler)

store.listers.Endpoint.Store, store.cache.Endpoint = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "endpoints", namespace, fields.Everything()),
&apiv1.Endpoints{}, resyncPeriod, eventHandler)

store.listers.Secret.Store, store.cache.Secret = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "secrets", namespace, fields.Everything()),
&apiv1.Secret{}, resyncPeriod, secrEventHandler)

store.listers.ConfigMap.Store, store.cache.Configmap = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "configmaps", namespace, fields.Everything()),
&apiv1.ConfigMap{}, resyncPeriod, mapEventHandler)

store.listers.Service.Store, store.cache.Service = cache.NewInformer(
cache.NewListWatchFromClient(client.CoreV1().RESTClient(), "services", namespace, fields.Everything()),
&apiv1.Service{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
store.informers.Ingress.AddEventHandler(ingEventHandler)
store.informers.Endpoint.AddEventHandler(epEventHandler)
store.informers.Secret.AddEventHandler(secrEventHandler)
store.informers.ConfigMap.AddEventHandler(mapEventHandler)
store.informers.Service.AddEventHandler(cache.ResourceEventHandlerFuncs{})

return store
}
Expand Down Expand Up @@ -611,11 +620,11 @@ func (s *k8sStore) setConfig(cmap *apiv1.ConfigMap) {
}
}

// Run initiates the synchronization of the controllers
// and the initial synchronization of the secrets.
// Run initiates the synchronization of the informers and the initial
// synchronization of the secrets.
func (s k8sStore) Run(stopCh chan struct{}) {
// start controllers
s.cache.Run(stopCh)
// start informers
s.informers.Run(stopCh)

// initial sync of secrets to avoid unnecessary reloads
glog.Info("running initial sync of secrets")
Expand Down

0 comments on commit b09ecf7

Please sign in to comment.