Skip to content

Commit

Permalink
Refactor watchers - Create separate type for namespaced informers (ng…
Browse files Browse the repository at this point in the history
…inx#3238)

* Create separate type for namespaced watchers

* Create separate type for namespaced watchers in certmanager controller

* Create separate type for namespaced watchers in extdns controller

* Fix case where namespace is not watching secrets
  • Loading branch information
ciarams87 authored and coolbry95 committed Nov 18, 2022
1 parent 668122a commit 25d74d7
Show file tree
Hide file tree
Showing 12 changed files with 573 additions and 592 deletions.
116 changes: 60 additions & 56 deletions internal/certmanager/cm_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,15 @@ const (
// and creates/ updates certificates for VS resources as required,
// and VS resources when certificate objects are created/ updated
type CmController struct {
vsLister []listers_v1.VirtualServerLister
sync SyncFn
ctx context.Context
mustSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
vsSharedInformerFactory []vsinformers.SharedInformerFactory
cmSharedInformerFactory []cm_informers.SharedInformerFactory
kubeSharedInformerFactory []kubeinformers.SharedInformerFactory
recorder record.EventRecorder
cmClient *cm_clientset.Clientset
sync SyncFn
ctx context.Context
mustSync []cache.InformerSynced
queue workqueue.RateLimitingInterface
informerGroup map[string]*namespacedInformer
recorder record.EventRecorder
cmClient *cm_clientset.Clientset
kubeClient kubernetes.Interface
vsClient k8s_nginx.Interface
}

// CmOpts is the options required for building the CmController
Expand All @@ -78,27 +77,42 @@ type CmOpts struct {
vsClient k8s_nginx.Interface
}

type namespacedInformer struct {
vsSharedInformerFactory vsinformers.SharedInformerFactory
cmSharedInformerFactory cm_informers.SharedInformerFactory
kubeSharedInformerFactory kubeinformers.SharedInformerFactory
vsLister listers_v1.VirtualServerLister
cmLister cmlisters.CertificateLister
}

func (c *CmController) register() workqueue.RateLimitingInterface {
var cmLister []cmlisters.CertificateLister
for _, sif := range c.vsSharedInformerFactory {
c.vsLister = append(c.vsLister, sif.K8s().V1().VirtualServers().Lister())
sif.K8s().V1().VirtualServers().Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{
Queue: c.queue,
})
c.mustSync = append(c.mustSync, sif.K8s().V1().VirtualServers().Informer().HasSynced)
}
c.sync = SyncFnFor(c.recorder, c.cmClient, c.informerGroup)
return c.queue
}

for _, cif := range c.cmSharedInformerFactory {
cif.Certmanager().V1().Certificates().Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
WorkFunc: certificateHandler(c.queue),
})
cmLister = append(cmLister, cif.Certmanager().V1().Certificates().Lister())
c.mustSync = append(c.mustSync, cif.Certmanager().V1().Certificates().Informer().HasSynced)
}
func (c *CmController) newNamespacedInformer(ns string) {
nsi := &namespacedInformer{}
nsi.cmSharedInformerFactory = cm_informers.NewSharedInformerFactoryWithOptions(c.cmClient, resyncPeriod, cm_informers.WithNamespace(ns))
nsi.kubeSharedInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(c.kubeClient, resyncPeriod, kubeinformers.WithNamespace(ns))
nsi.vsSharedInformerFactory = vsinformers.NewSharedInformerFactoryWithOptions(c.vsClient, resyncPeriod, vsinformers.WithNamespace(ns))

c.sync = SyncFnFor(c.recorder, c.cmClient, cmLister)
c.addHandlers(nsi)

return c.queue
c.informerGroup[ns] = nsi
}

func (c *CmController) addHandlers(nsi *namespacedInformer) {
nsi.vsLister = nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Lister()
nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Informer().AddEventHandler(&controllerpkg.QueuingEventHandler{
Queue: c.queue,
})
c.mustSync = append(c.mustSync, nsi.vsSharedInformerFactory.K8s().V1().VirtualServers().Informer().HasSynced)

nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Informer().AddEventHandler(&controllerpkg.BlockingEventHandler{
WorkFunc: certificateHandler(c.queue),
})
nsi.cmLister = nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Lister()
c.mustSync = append(c.mustSync, nsi.cmSharedInformerFactory.Certmanager().V1().Certificates().Informer().HasSynced)
}

func (c *CmController) processItem(ctx context.Context, key string) error {
Expand All @@ -108,14 +122,11 @@ func (c *CmController) processItem(ctx context.Context, key string) error {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return err
}
nsi := getNamespacedInformer(namespace, c.informerGroup)

var vs *conf_v1.VirtualServer
for _, vl := range c.vsLister {
vs, err = vl.VirtualServers(namespace).Get(name)
if err == nil {
break
}
}
vs, err = nsi.vsLister.VirtualServers(namespace).Get(name)

if err != nil {
return err
}
Expand Down Expand Up @@ -168,25 +179,22 @@ func NewCmController(opts *CmOpts) *CmController {
// Create a cert-manager api client
intcl, _ := cm_clientset.NewForConfig(opts.kubeConfig)

var vsSharedInformerFactory []vsinformers.SharedInformerFactory
var cmSharedInformerFactory []cm_informers.SharedInformerFactory
var kubeSharedInformerFactory []kubeinformers.SharedInformerFactory
ig := make(map[string]*namespacedInformer)

for _, ns := range opts.namespace {
cmSharedInformerFactory = append(cmSharedInformerFactory, cm_informers.NewSharedInformerFactoryWithOptions(intcl, resyncPeriod, cm_informers.WithNamespace(ns)))
kubeSharedInformerFactory = append(kubeSharedInformerFactory, kubeinformers.NewSharedInformerFactoryWithOptions(opts.kubeClient, resyncPeriod, kubeinformers.WithNamespace(ns)))
vsSharedInformerFactory = append(vsSharedInformerFactory, vsinformers.NewSharedInformerFactoryWithOptions(opts.vsClient, resyncPeriod, vsinformers.WithNamespace(ns)))
cm := &CmController{
ctx: opts.context,
queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName),
informerGroup: ig,
recorder: opts.eventRecorder,
cmClient: intcl,
kubeClient: opts.kubeClient,
vsClient: opts.vsClient,
}

cm := &CmController{
ctx: opts.context,
queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName),
cmSharedInformerFactory: cmSharedInformerFactory,
kubeSharedInformerFactory: kubeSharedInformerFactory,
recorder: opts.eventRecorder,
cmClient: intcl,
vsSharedInformerFactory: vsSharedInformerFactory,
for _, ns := range opts.namespace {
cm.newNamespacedInformer(ns)
}

cm.register()
return cm
}
Expand All @@ -201,14 +209,10 @@ func (c *CmController) Run(stopCh <-chan struct{}) {

glog.Infof("Starting cert-manager control loop")

for _, vif := range c.vsSharedInformerFactory {
go vif.Start(c.ctx.Done())
}
for _, cif := range c.cmSharedInformerFactory {
go cif.Start(c.ctx.Done())
}
for _, kif := range c.kubeSharedInformerFactory {
go kif.Start(c.ctx.Done())
for _, ig := range c.informerGroup {
go ig.vsSharedInformerFactory.Start(c.ctx.Done())
go ig.cmSharedInformerFactory.Start(c.ctx.Done())
go ig.kubeSharedInformerFactory.Start(c.ctx.Done())
}
// // wait for all the informer caches we depend on are synced
glog.V(3).Infof("Waiting for %d caches to sync", len(c.mustSync))
Expand Down
27 changes: 18 additions & 9 deletions internal/certmanager/cm_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,16 @@ import (

cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
cmclient "github.com/cert-manager/cert-manager/pkg/client/clientset/versioned"
cm_informers "github.com/cert-manager/cert-manager/pkg/client/informers/externalversions"
controllerpkg "github.com/cert-manager/cert-manager/pkg/controller"
testpkg "github.com/nginxinc/kubernetes-ingress/internal/certmanager/test_files"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/util/workqueue"

vsapi "github.com/nginxinc/kubernetes-ingress/pkg/apis/configuration/v1"
k8s_nginx "github.com/nginxinc/kubernetes-ingress/pkg/client/clientset/versioned"
vsinformers "github.com/nginxinc/kubernetes-ingress/pkg/client/informers/externalversions"
)

func Test_controller_Register(t *testing.T) {
Expand Down Expand Up @@ -138,15 +135,27 @@ func Test_controller_Register(t *testing.T) {
// Certificate event is received then HasSynced has not been setup
// properly.

ig := make(map[string]*namespacedInformer)

nsi := &namespacedInformer{
cmSharedInformerFactory: b.Context.SharedInformerFactory,
kubeSharedInformerFactory: b.Context.KubeSharedInformerFactory,
vsSharedInformerFactory: b.VsSharedInformerFactory,
}

ig[""] = nsi

cm := &CmController{
ctx: b.RootContext,
queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName),
cmSharedInformerFactory: []cm_informers.SharedInformerFactory{b.FakeCMInformerFactory()},
kubeSharedInformerFactory: []kubeinformers.SharedInformerFactory{b.FakeKubeInformerFactory()},
recorder: b.Recorder,
vsSharedInformerFactory: []vsinformers.SharedInformerFactory{b.VsSharedInformerFactory},
ctx: b.RootContext,
queue: workqueue.NewNamedRateLimitingQueue(controllerpkg.DefaultItemBasedRateLimiter(), ControllerName),
informerGroup: ig,
recorder: b.Recorder,
kubeClient: b.Client,
vsClient: b.VSClient,
}

cm.addHandlers(nsi)

queue := cm.register()

b.Start()
Expand Down
18 changes: 18 additions & 0 deletions internal/certmanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,21 @@ func translateVsSpec(crt *cmapi.Certificate, vsCmSpec *vsapi.CertManager) error
}
return nil
}

func getNamespacedInformer(ns string, ig map[string]*namespacedInformer) *namespacedInformer {
var nsi *namespacedInformer
var isGlobalNs bool
var exists bool

nsi, isGlobalNs = ig[""]

if !isGlobalNs {
// get the correct namespaced informers
nsi, exists = ig[ns]
if !exists {
// we are not watching this namespace
return nil
}
}
return nsi
}
24 changes: 9 additions & 15 deletions internal/certmanager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type SyncFn func(context.Context, *vsapi.VirtualServer) error
func SyncFnFor(
rec record.EventRecorder,
cmClient clientset.Interface,
cmLister []cmlisters.CertificateLister,
ig map[string]*namespacedInformer,
) SyncFn {
return func(ctx context.Context, vs *vsapi.VirtualServer) error {
var err error
Expand All @@ -75,7 +75,9 @@ func SyncFnFor(
return err
}

newCrts, updateCrts, err := buildCertificates(cmLister, vs, issuerName, issuerKind, issuerGroup)
nsi := getNamespacedInformer(vs.GetNamespace(), ig)

newCrts, updateCrts, err := buildCertificates(nsi.cmLister, vs, issuerName, issuerKind, issuerGroup)
if err != nil {
glog.Errorf("Incorrect cert-manager configuration for VirtualServer resource: %v", err)
rec.Eventf(vs, corev1.EventTypeWarning, reasonBadConfig, "Incorrect cert-manager configuration for VirtualServer resource: %s",
Expand Down Expand Up @@ -106,12 +108,8 @@ func SyncFnFor(
}
var certs []*cmapi.Certificate

for _, cl := range cmLister {
certs, err = cl.Certificates(vs.GetNamespace()).List(labels.Everything())
if len(certs) > 0 {
break
}
}
certs, err = nsi.cmLister.Certificates(vs.GetNamespace()).List(labels.Everything())

if err != nil {
return err
}
Expand All @@ -131,7 +129,7 @@ func SyncFnFor(
}

func buildCertificates(
cmLister []cmlisters.CertificateLister,
cmLister cmlisters.CertificateLister,
vs *vsapi.VirtualServer,
issuerName, issuerKind, issuerGroup string,
) (newCert, update []*cmapi.Certificate, _ error) {
Expand All @@ -140,12 +138,8 @@ func buildCertificates(
var existingCrt *cmapi.Certificate
var err error

for _, cl := range cmLister {
existingCrt, err = cl.Certificates(vs.Namespace).Get(vs.Spec.TLS.Secret)
if err == nil {
break
}
}
existingCrt, err = cmLister.Certificates(vs.Namespace).Get(vs.Spec.TLS.Secret)

if !apierrors.IsNotFound(err) && err != nil {
return nil, nil, err
}
Expand Down
15 changes: 13 additions & 2 deletions internal/certmanager/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

cmapi "github.com/cert-manager/cert-manager/pkg/apis/certmanager/v1"
cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
cmlisters "github.com/cert-manager/cert-manager/pkg/client/listers/certmanager/v1"
"github.com/cert-manager/cert-manager/test/unit/gen"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -491,7 +490,19 @@ func TestSync(t *testing.T) {
}
b.Init()
defer b.Stop()
sync := SyncFnFor(b.Recorder, b.CMClient, []cmlisters.CertificateLister{b.SharedInformerFactory.Certmanager().V1().Certificates().Lister()})

ig := make(map[string]*namespacedInformer)

nsi := &namespacedInformer{
cmSharedInformerFactory: b.FakeCMInformerFactory(),
kubeSharedInformerFactory: b.FakeKubeInformerFactory(),
vsSharedInformerFactory: b.VsSharedInformerFactory,
cmLister: b.SharedInformerFactory.Certmanager().V1().Certificates().Lister(),
}

ig[""] = nsi

sync := SyncFnFor(b.Recorder, b.CMClient, ig)
b.Start()

err := sync(context.Background(), &test.VirtualServer)
Expand Down
Loading

0 comments on commit 25d74d7

Please sign in to comment.