diff --git a/pkg/controller/agent/controller.go b/pkg/controller/agent/controller.go index ab57ced8e3..6b6d46d5aa 100644 --- a/pkg/controller/agent/controller.go +++ b/pkg/controller/agent/controller.go @@ -43,7 +43,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, r) + return addWatches(mgr, c, r) } // newReconciler returns a new reconcile.Reconciler. @@ -58,53 +58,62 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileAg } // addWatches adds watches for all resources this controller cares about -func addWatches(c controller.Controller, r *ReconcileAgent) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileAgent) error { // Watch for changes to Agent - if err := c.Watch(&source.Kind{Type: &agentv1alpha1.Agent{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch( + source.Kind(mgr.GetCache(), &agentv1alpha1.Agent{}), + &handler.EnqueueRequestForObject{}); err != nil { return err } // Watch DaemonSets - if err := c.Watch(&source.Kind{Type: &appsv1.DaemonSet{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &agentv1alpha1.Agent{}, - }); err != nil { + if err := c.Watch( + source.Kind(mgr.GetCache(), &appsv1.DaemonSet{}), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), + &agentv1alpha1.Agent{}, handler.OnlyControllerOwner()), + ); err != nil { return err } // Watch Deployments - if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &agentv1alpha1.Agent{}, - }); err != nil { + if err := c.Watch( + source.Kind(mgr.GetCache(), &appsv1.Deployment{}), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), + &agentv1alpha1.Agent{}, handler.OnlyControllerOwner()), + ); err != nil { return err } // Watch Pods, to ensure `status.version` is correctly reconciled on any change. // Watching Deployments or DaemonSets only may lead to missing some events. - if err := watches.WatchPods(c, NameLabelName); err != nil { + if err := watches.WatchPods(mgr, c, NameLabelName); err != nil { return err } // Watch Secrets - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &agentv1alpha1.Agent{}, - }); err != nil { + if err := c.Watch( + source.Kind(mgr.GetCache(), &corev1.Secret{}), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), + &agentv1alpha1.Agent{}, handler.OnlyControllerOwner()), + ); err != nil { return err } // Watch services - Agent in Fleet mode with Fleet Server enabled configures and exposes a Service // for Elastic Agents to connect to. - if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &agentv1alpha1.Agent{}, - }); err != nil { + if err := c.Watch( + source.Kind(mgr.GetCache(), &corev1.Service{}), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), + &agentv1alpha1.Agent{}, handler.OnlyControllerOwner()), + ); err != nil { return err } // Watch dynamically referenced Secrets - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets) + return c.Watch( + source.Kind(mgr.GetCache(), &corev1.Secret{}), + r.dynamicWatches.Secrets, + ) } var _ reconcile.Reconciler = &ReconcileAgent{} diff --git a/pkg/controller/apmserver/controller.go b/pkg/controller/apmserver/controller.go index 2889f5bd09..ae88fa380f 100644 --- a/pkg/controller/apmserver/controller.go +++ b/pkg/controller/apmserver/controller.go @@ -83,7 +83,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, reconciler) + return addWatches(mgr, c, reconciler) } // newReconciler returns a new reconcile.Reconciler @@ -96,48 +96,48 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileAp } } -func addWatches(c controller.Controller, r *ReconcileApmServer) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileApmServer) error { // Watch for changes to ApmServer - err := c.Watch(&source.Kind{Type: &apmv1.ApmServer{}}, &handler.EnqueueRequestForObject{}) + err := c.Watch(source.Kind(mgr.GetCache(), &apmv1.ApmServer{}), &handler.EnqueueRequestForObject{}) if err != nil { return err } // Watch Deployments - if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &apmv1.ApmServer{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &apmv1.ApmServer{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch Pods, to ensure `status.version` and version upgrades are correctly reconciled on any change. // Watching Deployments only may lead to missing some events. - if err := watches.WatchPods(c, ApmServerNameLabelName); err != nil { + if err := watches.WatchPods(mgr, c, ApmServerNameLabelName); err != nil { return err } // Watch services - if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &apmv1.ApmServer{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &apmv1.ApmServer{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch owned and soft-owned secrets - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &apmv1.ApmServer{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &apmv1.ApmServer{}, handler.OnlyControllerOwner(), + )); err != nil { return err } - if err := watches.WatchSoftOwnedSecrets(c, apmv1.Kind); err != nil { + if err := watches.WatchSoftOwnedSecrets(mgr, c, apmv1.Kind); err != nil { return err } // dynamically watch referenced secrets to connect to Elasticsearch - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets) + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets) } var _ reconcile.Reconciler = &ReconcileApmServer{} diff --git a/pkg/controller/association/controller.go b/pkg/controller/association/controller.go index 7bb74b0b1f..302f5abb7f 100644 --- a/pkg/controller/association/controller.go +++ b/pkg/controller/association/controller.go @@ -37,33 +37,33 @@ func AddAssociationController( if err != nil { return err } - return addWatches(c, r) + return addWatches(mgr, c, r) } -func addWatches(c controller.Controller, r *Reconciler) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *Reconciler) error { // Watch the associated resource (e.g. Kibana for a Kibana -> Elasticsearch association) - if err := c.Watch(&source.Kind{Type: r.AssociatedObjTemplate()}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), r.AssociatedObjTemplate()), &handler.EnqueueRequestForObject{}); err != nil { return err } // Watch Secrets owned by the associated resource - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{ - OwnerType: r.AssociatedObjTemplate(), - IsController: true, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + r.AssociatedObjTemplate(), handler.OnlyControllerOwner(), + )); err != nil { return err } // Dynamically watch the referenced resources (e.g. Elasticsearch B for a Kibana A -> Elasticsearch B association) - if err := c.Watch(&source.Kind{Type: r.ReferencedObjTemplate()}, r.watches.ReferencedResources); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), r.ReferencedObjTemplate()), r.watches.ReferencedResources); err != nil { return err } // Dynamically watch Secrets (CA Secret of the referenced resource, ES user secret or custom referenced object secret) - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.watches.Secrets); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.watches.Secrets); err != nil { return err } // Dynamically watch Service objects for custom services setup by the user - return c.Watch(&source.Kind{Type: &corev1.Service{}}, r.watches.Services) + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), r.watches.Services) } diff --git a/pkg/controller/autoscaling/elasticsearch.go b/pkg/controller/autoscaling/elasticsearch.go index 4a75de6538..b4e49580e7 100644 --- a/pkg/controller/autoscaling/elasticsearch.go +++ b/pkg/controller/autoscaling/elasticsearch.go @@ -26,7 +26,7 @@ func Add(mgr manager.Manager, p operator.Parameters) error { } // The deprecated/legacy controller watches for changes on Elasticsearch clusters. - if err := legacyController.Watch(&source.Kind{Type: &esv1.Elasticsearch{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := legacyController.Watch(source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}), &handler.EnqueueRequestForObject{}); err != nil { return err } @@ -36,8 +36,8 @@ func Add(mgr manager.Manager, p operator.Parameters) error { if err != nil { return err } - if err := controller.Watch(&source.Kind{Type: &v1alpha1.ElasticsearchAutoscaler{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := controller.Watch(source.Kind(mgr.GetCache(), &v1alpha1.ElasticsearchAutoscaler{}), &handler.EnqueueRequestForObject{}); err != nil { return err } - return controller.Watch(&source.Kind{Type: &esv1.Elasticsearch{}}, reconciler.Watches.ReferencedResources) + return controller.Watch(source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}), reconciler.Watches.ReferencedResources) } diff --git a/pkg/controller/beat/controller.go b/pkg/controller/beat/controller.go index e1b0ebcdde..14a3877a0b 100644 --- a/pkg/controller/beat/controller.go +++ b/pkg/controller/beat/controller.go @@ -52,7 +52,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, r) + return addWatches(mgr, c, r) } // newReconciler returns a new reconcile.Reconciler. @@ -67,47 +67,47 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileBe } // addWatches adds watches for all resources this controller cares about -func addWatches(c controller.Controller, r *ReconcileBeat) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileBeat) error { // Watch for changes to Beat - if err := c.Watch(&source.Kind{Type: &beatv1beta1.Beat{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &beatv1beta1.Beat{}), &handler.EnqueueRequestForObject{}); err != nil { return err } // Watch DaemonSets - if err := c.Watch(&source.Kind{Type: &appsv1.DaemonSet{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &beatv1beta1.Beat{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.DaemonSet{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &beatv1beta1.Beat{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch Deployments - if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &beatv1beta1.Beat{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &beatv1beta1.Beat{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch Pods, to ensure `status.version` is correctly reconciled on any change. // Watching Deployments or DaemonSets only may lead to missing some events. - if err := watches.WatchPods(c, beatcommon.NameLabelName); err != nil { + if err := watches.WatchPods(mgr, c, beatcommon.NameLabelName); err != nil { return err } // Watch owned and soft-owned Secrets - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &beatv1beta1.Beat{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &beatv1beta1.Beat{}, handler.OnlyControllerOwner(), + )); err != nil { return err } - if err := watches.WatchSoftOwnedSecrets(c, beatv1beta1.Kind); err != nil { + if err := watches.WatchSoftOwnedSecrets(mgr, c, beatv1beta1.Kind); err != nil { return err } // Watch dynamically referenced Secrets - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets) + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets) } var _ reconcile.Reconciler = &ReconcileBeat{} diff --git a/pkg/controller/common/reconciler/handler.go b/pkg/controller/common/reconciler/handler.go index 613c26d93c..e0ce613d65 100644 --- a/pkg/controller/common/reconciler/handler.go +++ b/pkg/controller/common/reconciler/handler.go @@ -5,6 +5,8 @@ package reconciler import ( + "context" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -14,7 +16,7 @@ import ( // GenericEventHandler returns an EventHandler that enqueues a reconciliation request // from the generic event NamespacedName. func GenericEventHandler() handler.EventHandler { - return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request { + return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request { return []reconcile.Request{ { NamespacedName: types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()}, diff --git a/pkg/controller/common/watches/handler.go b/pkg/controller/common/watches/handler.go index 5b948a6c6c..92370cdc5c 100644 --- a/pkg/controller/common/watches/handler.go +++ b/pkg/controller/common/watches/handler.go @@ -5,14 +5,12 @@ package watches import ( + "context" "sync" - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/runtime/inject" ulog "github.com/elastic/cloud-on-k8s/v2/pkg/utils/log" ) @@ -42,8 +40,6 @@ func NewDynamicEnqueueRequest() *DynamicEnqueueRequest { type DynamicEnqueueRequest struct { mutex sync.RWMutex registrations map[string]HandlerRegistration - // mapper maps GroupVersionKinds to Resources - mapper meta.RESTMapper } // AddHandlers adds the new event handlers to this DynamicEnqueueRequest. @@ -60,14 +56,7 @@ func (d *DynamicEnqueueRequest) AddHandlers(handlers ...HandlerRegistration) err func (d *DynamicEnqueueRequest) AddHandler(handler HandlerRegistration) error { d.mutex.Lock() defer d.mutex.Unlock() - if _, err := inject.SchemeInto(scheme.Scheme, handler); err != nil { - log.Error(err, "Failed to add handler to dynamic enqueue request") - return err - } - if _, err := inject.MapperInto(d.mapper, handler); err != nil { - log.Error(err, "Failed to add mapper to dynamic enqueue request") - return err - } + _, exists := d.registrations[handler.Key()] if !exists { log.V(1).Info("Adding new handler registration", "key", handler.Key(), "current_registrations", d.registrations) @@ -103,46 +92,38 @@ func (d *DynamicEnqueueRequest) Registrations() []string { var _ handler.EventHandler = &DynamicEnqueueRequest{} // Create is called in response to a create event - e.g. Pod Creation. -func (d *DynamicEnqueueRequest) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (d *DynamicEnqueueRequest) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { d.mutex.RLock() defer d.mutex.RUnlock() for _, v := range d.registrations { - v.EventHandler().Create(evt, q) + v.EventHandler().Create(ctx, evt, q) } } // Update is called in response to an update event - e.g. Pod Updated. -func (d *DynamicEnqueueRequest) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (d *DynamicEnqueueRequest) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { d.mutex.RLock() defer d.mutex.RUnlock() for _, v := range d.registrations { - v.EventHandler().Update(evt, q) + v.EventHandler().Update(ctx, evt, q) } } // Delete is called in response to a delete event - e.g. Pod Deleted. -func (d *DynamicEnqueueRequest) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (d *DynamicEnqueueRequest) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { d.mutex.RLock() defer d.mutex.RUnlock() for _, v := range d.registrations { - v.EventHandler().Delete(evt, q) + v.EventHandler().Delete(ctx, evt, q) } } // Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or // external trigger request - e.g. reconcile Autoscaling, or a Webhook. -func (d *DynamicEnqueueRequest) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (d *DynamicEnqueueRequest) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { d.mutex.RLock() defer d.mutex.RUnlock() for _, v := range d.registrations { - v.EventHandler().Generic(evt, q) + v.EventHandler().Generic(ctx, evt, q) } } - -// InjectMapper is called by the Controller to provide the rest mapper used by the manager. -func (d *DynamicEnqueueRequest) InjectMapper(m meta.RESTMapper) error { - d.mapper = m - return nil -} - -var _ inject.Mapper = &DynamicEnqueueRequest{} diff --git a/pkg/controller/common/watches/handler_integration_test.go b/pkg/controller/common/watches/handler_integration_test.go index 3c91a89349..936dabf1f9 100644 --- a/pkg/controller/common/watches/handler_integration_test.go +++ b/pkg/controller/common/watches/handler_integration_test.go @@ -48,7 +48,7 @@ func TestDynamicEnqueueRequest(t *testing.T) { }) ctrl, err := controller.New("test-reconciler", mgr, controller.Options{Reconciler: reconcileFunc}) require.NoError(t, err) - require.NoError(t, ctrl.Watch(&source.Kind{Type: &corev1.Secret{}}, eventHandler)) + require.NoError(t, ctrl.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), eventHandler)) return nil } diff --git a/pkg/controller/common/watches/handler_test.go b/pkg/controller/common/watches/handler_test.go index 81333ccff6..4e6c997183 100644 --- a/pkg/controller/common/watches/handler_test.go +++ b/pkg/controller/common/watches/handler_test.go @@ -5,6 +5,7 @@ package watches import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -138,7 +139,7 @@ func TestDynamicEnqueueRequest_EventHandler(t *testing.T) { } d := NewDynamicEnqueueRequest() - require.NoError(t, d.InjectMapper(getRESTMapper())) + // require.NoError(t, d.InjectMapper(getRESTMapper())) q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) assertEmptyQueue := func() { @@ -159,7 +160,7 @@ func TestDynamicEnqueueRequest_EventHandler(t *testing.T) { assertEmptyQueue() // simulate an object creation - d.Create(event.CreateEvent{ + d.Create(context.Background(), event.CreateEvent{ Object: testObject1, }, q) assertEmptyQueue() @@ -173,31 +174,31 @@ func TestDynamicEnqueueRequest_EventHandler(t *testing.T) { assertEmptyQueue() // simulate first object creation - d.Create(event.CreateEvent{ + d.Create(context.Background(), event.CreateEvent{ Object: testObject1, }, q) assertReconcileReq(watching) // simulate object update - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject1, ObjectNew: updated1, }, q) assertReconcileReq(watching) // simulate object deletion - d.Delete(event.DeleteEvent{ + d.Delete(context.Background(), event.DeleteEvent{ Object: testObject1, }, q) assertReconcileReq(watching) // simulate second object creation - d.Create(event.CreateEvent{ + d.Create(context.Background(), event.CreateEvent{ Object: testObject2, }, q) // no watcher, nothing in the queue assertEmptyQueue() // simulate second object update - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject2, ObjectNew: updated2, }, q) @@ -211,12 +212,12 @@ func TestDynamicEnqueueRequest_EventHandler(t *testing.T) { Name: "test-watch-2", })) // simulate second object creation - d.Create(event.CreateEvent{ + d.Create(context.Background(), event.CreateEvent{ Object: testObject2, }, q) assertReconcileReq(watching) // simulate second object update - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject2, ObjectNew: updated2, }, q) @@ -225,14 +226,14 @@ func TestDynamicEnqueueRequest_EventHandler(t *testing.T) { // remove the watch for object 2 d.RemoveHandlerForKey("test-watch-2") // simulate object update: nothing should happen - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject2, ObjectNew: updated2, }, q) assertEmptyQueue() // updates on the first object should still work - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject1, ObjectNew: updated1, }, q) @@ -246,14 +247,14 @@ func TestDynamicEnqueueRequest_EventHandler(t *testing.T) { })) // update on the first object should register - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject1, ObjectNew: updated1, }, q) assertReconcileReq(watching) // update on the second object should register too - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject2, ObjectNew: updated2, }, q) @@ -269,28 +270,28 @@ func TestDynamicEnqueueRequest_EventHandler(t *testing.T) { // setup an owner watch where owner is testObject1 require.NoError(t, d.AddHandler(&OwnerWatch{ - EnqueueRequestForOwner: handler.EnqueueRequestForOwner{ - OwnerType: testObject1, - IsController: true, - }, + Scheme: scheme.Scheme, + Mapper: getRESTMapper(), + OwnerType: testObject1, + IsController: true, })) // let's make object 1 the owner of object 2 require.NoError(t, controllerutil.SetControllerReference(testObject1, testObject2, scheme.Scheme)) // an update on object 2 should enqueue a request for object 1 (the owner) - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject2, ObjectNew: updated2, }, q) assertReconcileReq(nsn1) // same for deletes - d.Delete(event.DeleteEvent{ + d.Delete(context.Background(), event.DeleteEvent{ Object: testObject2, }, q) assertReconcileReq(nsn1) // named watch on object 1 should still work - d.Create(event.CreateEvent{ + d.Create(context.Background(), event.CreateEvent{ Object: testObject1, }, q) assertReconcileReq(watching) @@ -303,7 +304,7 @@ func TestDynamicEnqueueRequest_EventHandler(t *testing.T) { Watcher: watching, Name: "test-watch-2", })) - d.Create(event.CreateEvent{ + d.Create(context.Background(), event.CreateEvent{ Object: testObject2, }, q) expected := []types.NamespacedName{ @@ -341,7 +342,6 @@ func TestDynamicEnqueueRequest_OwnerWatch(t *testing.T) { updated2.Labels = map[string]string{"updated": "2"} d := NewDynamicEnqueueRequest() - require.NoError(t, d.InjectMapper(getRESTMapper())) q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) assertEmptyQueue := func() { @@ -362,24 +362,24 @@ func TestDynamicEnqueueRequest_OwnerWatch(t *testing.T) { assertEmptyQueue() // setup an owner watch where owner is testObject1 require.NoError(t, d.AddHandler(&OwnerWatch{ - EnqueueRequestForOwner: handler.EnqueueRequestForOwner{ - OwnerType: testObject1, - IsController: true, - }, + OwnerType: testObject1, + IsController: true, + Scheme: scheme.Scheme, + Mapper: getRESTMapper(), })) // END FIXTURES require.NoError(t, controllerutil.SetControllerReference(testObject1, testObject2, scheme.Scheme)) - d.Create(event.CreateEvent{ + d.Create(context.Background(), event.CreateEvent{ Object: testObject1, }, q) - d.Create(event.CreateEvent{ + d.Create(context.Background(), event.CreateEvent{ Object: testObject2, }, q) // an update on object 2 should enqueue a request for object 1 (the owner) - d.Update(event.UpdateEvent{ + d.Update(context.Background(), event.UpdateEvent{ ObjectOld: testObject2, ObjectNew: updated2, }, q) diff --git a/pkg/controller/common/watches/named_watch.go b/pkg/controller/common/watches/named_watch.go index 676f27a1da..f2d2fa18e2 100644 --- a/pkg/controller/common/watches/named_watch.go +++ b/pkg/controller/common/watches/named_watch.go @@ -5,6 +5,8 @@ package watches import ( + "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" @@ -26,13 +28,13 @@ type NamedWatch struct { var _ handler.EventHandler = &NamedWatch{} -func (w NamedWatch) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { +func (w NamedWatch) Create(_ context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) { for _, req := range w.toReconcileRequest(evt.Object) { q.Add(req) } } -func (w NamedWatch) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { +func (w NamedWatch) Update(_ context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { for _, req := range w.toReconcileRequest(evt.ObjectOld) { q.Add(req) } @@ -41,13 +43,13 @@ func (w NamedWatch) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterf } } -func (w NamedWatch) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { +func (w NamedWatch) Delete(_ context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) { for _, req := range w.toReconcileRequest(evt.Object) { q.Add(req) } } -func (w NamedWatch) Generic(evt event.GenericEvent, q workqueue.RateLimitingInterface) { +func (w NamedWatch) Generic(_ context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) { for _, req := range w.toReconcileRequest(evt.Object) { q.Add(req) } diff --git a/pkg/controller/common/watches/owner_watch.go b/pkg/controller/common/watches/owner_watch.go index 556b1af9dc..4e7750447c 100644 --- a/pkg/controller/common/watches/owner_watch.go +++ b/pkg/controller/common/watches/owner_watch.go @@ -5,11 +5,17 @@ package watches import ( + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/handler" ) type OwnerWatch struct { - handler.EnqueueRequestForOwner + Scheme *runtime.Scheme + Mapper meta.RESTMapper + OwnerType client.Object + IsController bool } func (o *OwnerWatch) Key() string { @@ -17,7 +23,12 @@ func (o *OwnerWatch) Key() string { } func (o *OwnerWatch) EventHandler() handler.EventHandler { - return o + opts := []handler.OwnerOption{} + if o.IsController { + opts = []handler.OwnerOption{handler.OnlyControllerOwner()} + } + + return handler.EnqueueRequestForOwner(o.Scheme, o.Mapper, o.OwnerType, opts...) } var _ HandlerRegistration = &OwnerWatch{} diff --git a/pkg/controller/common/watches/pods.go b/pkg/controller/common/watches/pods.go index e8dbecb261..fb980c1e0a 100644 --- a/pkg/controller/common/watches/pods.go +++ b/pkg/controller/common/watches/pods.go @@ -5,27 +5,47 @@ package watches import ( + "context" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) +func Add(mgr manager.Manager, c controller.Controller, owner client.Object, objects ...client.Object) error { + if err := c.Watch( + source.Kind(mgr.GetCache(), owner), &handler.EnqueueRequestForObject{}, + ); err != nil { + return err + } + for _, object := range objects { + if err := c.Watch( + source.Kind(mgr.GetCache(), object), + handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), owner, handler.OnlyControllerOwner()), + ); err != nil { + return err + } + } + return nil +} + // WatchPods updates the given controller to enqueue reconciliation requests triggered by changes on Pods. // The resource to reconcile is identified by a label on the Pods. -func WatchPods(c controller.Controller, objNameLabel string) error { +func WatchPods(mgr manager.Manager, c controller.Controller, objNameLabel string) error { return c.Watch( - &source.Kind{Type: &corev1.Pod{}}, + source.Kind(mgr.GetCache(), &corev1.Pod{}), handler.EnqueueRequestsFromMapFunc(objToReconcileRequest(objNameLabel)), ) } // objToReconcileRequest returns a function to enqueue reconcile requests for the resource name set at objNameLabel. -func objToReconcileRequest(objNameLabel string) func(object client.Object) []reconcile.Request { - return func(object client.Object) []reconcile.Request { +func objToReconcileRequest(objNameLabel string) handler.MapFunc { + return func(ctx context.Context, object client.Object) []reconcile.Request { labels := object.GetLabels() objectName, isSet := labels[objNameLabel] if !isSet { diff --git a/pkg/controller/common/watches/pods_test.go b/pkg/controller/common/watches/pods_test.go index 4a2ccc9ff2..2a9108fff1 100644 --- a/pkg/controller/common/watches/pods_test.go +++ b/pkg/controller/common/watches/pods_test.go @@ -5,6 +5,7 @@ package watches import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -50,7 +51,7 @@ func Test_objToReconcileRequest(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := fn(tt.obj) + got := fn(context.Background(), tt.obj) require.Equal(t, tt.want, got) }) } diff --git a/pkg/controller/common/watches/secrets.go b/pkg/controller/common/watches/secrets.go index 3b1c7e8909..9146b5e3c2 100644 --- a/pkg/controller/common/watches/secrets.go +++ b/pkg/controller/common/watches/secrets.go @@ -5,11 +5,14 @@ package watches import ( + "context" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -63,15 +66,15 @@ func WatchUserProvidedNamespacedSecrets( } // WatchSoftOwnedSecrets triggers reconciliations on secrets referencing a soft owner. -func WatchSoftOwnedSecrets(c controller.Controller, ownerKind string) error { +func WatchSoftOwnedSecrets(mgr manager.Manager, c controller.Controller, ownerKind string) error { return c.Watch( - &source.Kind{Type: &corev1.Secret{}}, + source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestsFromMapFunc(reconcileReqForSoftOwner(ownerKind)), ) } func reconcileReqForSoftOwner(kind string) handler.MapFunc { - return func(object client.Object) []reconcile.Request { + return func(ctx context.Context, object client.Object) []reconcile.Request { softOwner, referenced := reconciler.SoftOwnerRefFromLabels(object.GetLabels()) if !referenced { return nil diff --git a/pkg/controller/common/watches/secrets_test.go b/pkg/controller/common/watches/secrets_test.go index befba1f890..4c07f64f56 100644 --- a/pkg/controller/common/watches/secrets_test.go +++ b/pkg/controller/common/watches/secrets_test.go @@ -5,6 +5,7 @@ package watches import ( + "context" "testing" "github.com/stretchr/testify/require" @@ -75,7 +76,7 @@ func Test_reconcileReqForSoftOwner(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - requests := toRequestsFunc(&tt.secret) + requests := toRequestsFunc(context.Background(), &tt.secret) require.Equal(t, tt.wantReconcileRequests, requests) }) } diff --git a/pkg/controller/elasticsearch/elasticsearch_controller.go b/pkg/controller/elasticsearch/elasticsearch_controller.go index 813b53c6b5..7a8631ab60 100644 --- a/pkg/controller/elasticsearch/elasticsearch_controller.go +++ b/pkg/controller/elasticsearch/elasticsearch_controller.go @@ -60,7 +60,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, reconciler) + return addWatches(mgr, c, reconciler) } // newReconciler returns a new reconcile.Reconciler @@ -79,55 +79,55 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileEl } } -func addWatches(c controller.Controller, r *ReconcileElasticsearch) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileElasticsearch) error { // Watch for changes to Elasticsearch if err := c.Watch( - &source.Kind{Type: &esv1.Elasticsearch{}}, &handler.EnqueueRequestForObject{}, + source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}), &handler.EnqueueRequestForObject{}, ); err != nil { return err } // Watch StatefulSets if err := c.Watch( - &source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &esv1.Elasticsearch{}, - }, - ); err != nil { + source.Kind(mgr.GetCache(), &appsv1.StatefulSet{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &esv1.Elasticsearch{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch pods belonging to ES clusters - if err := watches.WatchPods(c, label.ClusterNameLabelName); err != nil { + if err := watches.WatchPods(mgr, c, label.ClusterNameLabelName); err != nil { return err } // Watch services - if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &esv1.Elasticsearch{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &esv1.Elasticsearch{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch config maps for dynamic watches (currently used for additional CAs trust) - if err := c.Watch(&source.Kind{Type: &corev1.ConfigMap{}}, r.dynamicWatches.ConfigMaps); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.ConfigMap{}), r.dynamicWatches.ConfigMaps); err != nil { return err } // Watch owned and soft-owned secrets - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets); err != nil { return err } if err := r.dynamicWatches.Secrets.AddHandler(&watches.OwnerWatch{ - EnqueueRequestForOwner: handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &esv1.Elasticsearch{}, - }, - }); err != nil { + IsController: true, + OwnerType: &esv1.Elasticsearch{}, + Scheme: mgr.GetScheme(), + Mapper: mgr.GetRESTMapper(), + }, + ); err != nil { return err } - if err := watches.WatchSoftOwnedSecrets(c, esv1.Kind); err != nil { + if err := watches.WatchSoftOwnedSecrets(mgr, c, esv1.Kind); err != nil { return err } diff --git a/pkg/controller/enterprisesearch/enterprisesearch_controller.go b/pkg/controller/enterprisesearch/enterprisesearch_controller.go index b9a8557f22..a357954865 100644 --- a/pkg/controller/enterprisesearch/enterprisesearch_controller.go +++ b/pkg/controller/enterprisesearch/enterprisesearch_controller.go @@ -52,7 +52,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, reconciler) + return addWatches(mgr, c, reconciler) } // newReconciler returns a new reconcile.Reconciler @@ -66,48 +66,48 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileEn } } -func addWatches(c controller.Controller, r *ReconcileEnterpriseSearch) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileEnterpriseSearch) error { // Watch for changes to EnterpriseSearch - err := c.Watch(&source.Kind{Type: &entv1.EnterpriseSearch{}}, &handler.EnqueueRequestForObject{}) + err := c.Watch(source.Kind(mgr.GetCache(), &entv1.EnterpriseSearch{}), &handler.EnqueueRequestForObject{}) if err != nil { return err } // Watch Deployments - if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &entv1.EnterpriseSearch{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &entv1.EnterpriseSearch{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch Pods, to ensure `status.version` and version upgrades are correctly reconciled on any change. // Watching Deployments only may lead to missing some events. - if err := watches.WatchPods(c, EnterpriseSearchNameLabelName); err != nil { + if err := watches.WatchPods(mgr, c, EnterpriseSearchNameLabelName); err != nil { return err } // Watch services - if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &entv1.EnterpriseSearch{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &entv1.EnterpriseSearch{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch owned and soft-owned secrets - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &entv1.EnterpriseSearch{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &entv1.EnterpriseSearch{}, handler.OnlyControllerOwner(), + )); err != nil { return err } - if err := watches.WatchSoftOwnedSecrets(c, entv1.Kind); err != nil { + if err := watches.WatchSoftOwnedSecrets(mgr, c, entv1.Kind); err != nil { return err } // Dynamically watch referenced secrets to connect to Elasticsearch - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets) + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets) } var _ reconcile.Reconciler = &ReconcileEnterpriseSearch{} diff --git a/pkg/controller/kibana/controller.go b/pkg/controller/kibana/controller.go index 83b0fdb6d0..12cd562d49 100644 --- a/pkg/controller/kibana/controller.go +++ b/pkg/controller/kibana/controller.go @@ -49,7 +49,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, reconciler) + return addWatches(mgr, c, reconciler) } // newReconciler returns a new reconcile.Reconciler @@ -62,47 +62,46 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileKi } } -func addWatches(c controller.Controller, r *ReconcileKibana) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileKibana) error { // Watch for changes to Kibana - if err := c.Watch(&source.Kind{Type: &kbv1.Kibana{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &kbv1.Kibana{}), &handler.EnqueueRequestForObject{}); err != nil { return err } // Watch deployments - if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &kbv1.Kibana{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &kbv1.Kibana{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch Pods, to ensure `status.version` and version upgrades are correctly reconciled on any change. // Watching Deployments only may lead to missing some events. - if err := watches.WatchPods(c, KibanaNameLabelName); err != nil { + if err := watches.WatchPods(mgr, c, KibanaNameLabelName); err != nil { return err } // Watch services - if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &kbv1.Kibana{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &kbv1.Kibana{}, handler.OnlyControllerOwner(), + )); err != nil { return err } - // Watch owned and soft-owned secrets - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &kbv1.Kibana{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &kbv1.Kibana{}, handler.OnlyControllerOwner(), + )); err != nil { return err } - if err := watches.WatchSoftOwnedSecrets(c, kbv1.Kind); err != nil { + if err := watches.WatchSoftOwnedSecrets(mgr, c, kbv1.Kind); err != nil { return err } // dynamically watch referenced secrets to connect to Elasticsearch - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets) + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets) } var _ reconcile.Reconciler = &ReconcileKibana{} diff --git a/pkg/controller/license/license_controller.go b/pkg/controller/license/license_controller.go index 1a85a916bd..5bd7d933c8 100644 --- a/pkg/controller/license/license_controller.go +++ b/pkg/controller/license/license_controller.go @@ -72,7 +72,7 @@ func Add(mgr manager.Manager, p operator.Parameters) error { if err != nil { return err } - return addWatches(c, r.Client) + return addWatches(mgr, c, r.Client) } // newReconciler returns a new reconcile.Reconciler @@ -108,37 +108,38 @@ func nextReconcileRelativeTo(now, expiry time.Time, safety time.Duration) reconc } // addWatches adds a new Controller to mgr with r as the reconcile.Reconciler -func addWatches(c controller.Controller, k8sClient k8s.Client) error { +func addWatches(mgr manager.Manager, c controller.Controller, k8sClient k8s.Client) error { log := ulog.Log // no context available for contextual logging // Watch for changes to Elasticsearch clusters. if err := c.Watch( - &source.Kind{Type: &esv1.Elasticsearch{}}, &handler.EnqueueRequestForObject{}, + source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}), &handler.EnqueueRequestForObject{}, ); err != nil { return err } - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, handler.EnqueueRequestsFromMapFunc(func(object client.Object) []reconcile.Request { - secret, ok := object.(*corev1.Secret) - if !ok { - log.Error( - pkgerrors.Errorf("unexpected object type %T in watch handler, expected Secret", object), - "dropping watch event due to error in handler") - return nil - } - if !license.IsOperatorLicense(*secret) { - return nil - } + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) []reconcile.Request { + secret, ok := object.(*corev1.Secret) + if !ok { + log.Error( + pkgerrors.Errorf("unexpected object type %T in watch handler, expected Secret", object), + "dropping watch event due to error in handler") + return nil + } + if !license.IsOperatorLicense(*secret) { + return nil + } - // if a license is added/modified we want to update for potentially all clusters managed by this instance - // of ECK which is why we are listing all Elasticsearch clusters here and trigger a reconciliation - rs, err := reconcileRequestsForAllClusters(k8sClient, log) - if err != nil { - // dropping the event(s) at this point - log.Error(err, "failed to list affected clusters in enterprise license watch") - return nil - } - return rs - }), + // if a license is added/modified we want to update for potentially all clusters managed by this instance + // of ECK which is why we are listing all Elasticsearch clusters here and trigger a reconciliation + rs, err := reconcileRequestsForAllClusters(k8sClient, log) + if err != nil { + // dropping the event(s) at this point + log.Error(err, "failed to list affected clusters in enterprise license watch") + return nil + } + return rs + }), ); err != nil { return err } diff --git a/pkg/controller/license/license_controller_integration_test.go b/pkg/controller/license/license_controller_integration_test.go index 6a06a1904a..b9d996fb70 100644 --- a/pkg/controller/license/license_controller_integration_test.go +++ b/pkg/controller/license/license_controller_integration_test.go @@ -50,7 +50,7 @@ func TestReconcile(t *testing.T) { if err != nil { return err } - return addWatches(c, r.Client) + return addWatches(mgr, c, r.Client) }, operator.Parameters{}) defer stop() diff --git a/pkg/controller/license/trial/trial_controller.go b/pkg/controller/license/trial/trial_controller.go index a3c636af25..bbc2445179 100644 --- a/pkg/controller/license/trial/trial_controller.go +++ b/pkg/controller/license/trial/trial_controller.go @@ -232,37 +232,38 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileTr } } -func addWatches(c controller.Controller) error { +func addWatches(mgr manager.Manager, c controller.Controller) error { // Watch the trial status secret and the enterprise trial licenses as well - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request { - secret, ok := obj.(*corev1.Secret) - if !ok { - // no contextual logging available - ulog.Log.Error(fmt.Errorf("object of type %T in secret watch", obj), "dropping event due to type error") - } - if licensing.IsEnterpriseTrial(*secret) { + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), + handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request { + secret, ok := obj.(*corev1.Secret) + if !ok { + // no contextual logging available + ulog.Log.Error(fmt.Errorf("object of type %T in secret watch", obj), "dropping event due to type error") + } + if licensing.IsEnterpriseTrial(*secret) { + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }, + }, + } + } + + if obj.GetName() != licensing.TrialStatusSecretKey { + return nil + } return []reconcile.Request{ { NamespacedName: types.NamespacedName{ - Namespace: obj.GetNamespace(), - Name: obj.GetName(), + Namespace: secret.Annotations[licensing.TrialLicenseSecretNamespace], + Name: secret.Annotations[licensing.TrialLicenseSecretName], }, }, } - } - - if obj.GetName() != licensing.TrialStatusSecretKey { - return nil - } - return []reconcile.Request{ - { - NamespacedName: types.NamespacedName{ - Namespace: secret.Annotations[licensing.TrialLicenseSecretNamespace], - Name: secret.Annotations[licensing.TrialLicenseSecretName], - }, - }, - } - }), + }), ) } @@ -274,7 +275,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c) + return addWatches(mgr, c) } var _ reconcile.Reconciler = &ReconcileTrials{} diff --git a/pkg/controller/logstash/logstash_controller.go b/pkg/controller/logstash/logstash_controller.go index 0abf64419d..f1121fbd7e 100644 --- a/pkg/controller/logstash/logstash_controller.go +++ b/pkg/controller/logstash/logstash_controller.go @@ -44,7 +44,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, r) + return addWatches(mgr, c, r) } // newReconciler returns a new reconcile.Reconciler. @@ -59,49 +59,48 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileLo } // addWatches adds watches for all resources this controller cares about -func addWatches(c controller.Controller, r *ReconcileLogstash) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileLogstash) error { // Watch for changes to Logstash - if err := c.Watch(&source.Kind{Type: &logstashv1alpha1.Logstash{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &logstashv1alpha1.Logstash{}), &handler.EnqueueRequestForObject{}); err != nil { return err } // Watch StatefulSets if err := c.Watch( - &source.Kind{Type: &appsv1.StatefulSet{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &logstashv1alpha1.Logstash{}, - }, - ); err != nil { + source.Kind(mgr.GetCache(), &appsv1.StatefulSet{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &logstashv1alpha1.Logstash{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch Pods, to ensure `status.version` is correctly reconciled on any change. // Watching StatefulSets only may lead to missing some events. - if err := watches.WatchPods(c, NameLabelName); err != nil { + if err := watches.WatchPods(mgr, c, NameLabelName); err != nil { return err } // Watch services - if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &logstashv1alpha1.Logstash{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &logstashv1alpha1.Logstash{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch owned and soft-owned secrets - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &logstashv1alpha1.Logstash{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &logstashv1alpha1.Logstash{}, handler.OnlyControllerOwner(), + )); err != nil { return err } - if err := watches.WatchSoftOwnedSecrets(c, logstashv1alpha1.Kind); err != nil { + if err := watches.WatchSoftOwnedSecrets(mgr, c, logstashv1alpha1.Kind); err != nil { return err } // Watch dynamically referenced Secrets - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets) + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets) } var _ reconcile.Reconciler = &ReconcileLogstash{} diff --git a/pkg/controller/maps/controller.go b/pkg/controller/maps/controller.go index 07ab717d88..0a336472c4 100644 --- a/pkg/controller/maps/controller.go +++ b/pkg/controller/maps/controller.go @@ -56,7 +56,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, reconciler) + return addWatches(mgr, c, reconciler) } // newReconciler returns a new reconcile.Reconciler @@ -71,47 +71,47 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileMa } } -func addWatches(c controller.Controller, r *ReconcileMapsServer) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileMapsServer) error { // Watch for changes to MapsServer - if err := c.Watch(&source.Kind{Type: &emsv1alpha1.ElasticMapsServer{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &emsv1alpha1.ElasticMapsServer{}), &handler.EnqueueRequestForObject{}); err != nil { return err } // Watch deployments - if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &emsv1alpha1.ElasticMapsServer{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &emsv1alpha1.ElasticMapsServer{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch Pods, to ensure `status.version` and version upgrades are correctly reconciled on any change. // Watching Deployments only may lead to missing some events. - if err := watches.WatchPods(c, NameLabelName); err != nil { + if err := watches.WatchPods(mgr, c, NameLabelName); err != nil { return err } // Watch services - if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &emsv1alpha1.ElasticMapsServer{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &emsv1alpha1.ElasticMapsServer{}, handler.OnlyControllerOwner(), + )); err != nil { return err } // Watch owned and soft-owned secrets - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &emsv1alpha1.ElasticMapsServer{}, - }); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner( + mgr.GetScheme(), mgr.GetRESTMapper(), + &emsv1alpha1.ElasticMapsServer{}, handler.OnlyControllerOwner(), + )); err != nil { return err } - if err := watches.WatchSoftOwnedSecrets(c, emsv1alpha1.Kind); err != nil { + if err := watches.WatchSoftOwnedSecrets(mgr, c, emsv1alpha1.Kind); err != nil { return err } // Dynamically watch referenced secrets to connect to Elasticsearch - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets) + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets) } var _ reconcile.Reconciler = &ReconcileMapsServer{} diff --git a/pkg/controller/remoteca/controller.go b/pkg/controller/remoteca/controller.go index 4f3d15c1a6..b60337b06d 100644 --- a/pkg/controller/remoteca/controller.go +++ b/pkg/controller/remoteca/controller.go @@ -51,7 +51,7 @@ func Add(mgr manager.Manager, accessReviewer rbac.AccessReviewer, params operato if err != nil { return err } - return AddWatches(c, r) + return addWatches(mgr, c, r) } // NewReconciler returns a new reconcile.Reconciler diff --git a/pkg/controller/remoteca/watches.go b/pkg/controller/remoteca/watches.go index 69fee8a495..c41f6c796b 100644 --- a/pkg/controller/remoteca/watches.go +++ b/pkg/controller/remoteca/watches.go @@ -5,6 +5,7 @@ package remoteca import ( + "context" "fmt" v1 "k8s.io/api/core/v1" @@ -12,6 +13,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" @@ -24,28 +26,31 @@ import ( ) // AddWatches set watches on objects needed to manage the association between a local and a remote cluster. -func AddWatches(c controller.Controller, r *ReconcileRemoteCa) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileRemoteCa) error { // Watch for changes to RemoteCluster - if err := c.Watch(&source.Kind{Type: &esv1.Elasticsearch{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}), &handler.EnqueueRequestForObject{}); err != nil { return err } // Watch Secrets that contain remote certificate authorities managed by this controller - if err := c.Watch(&source.Kind{Type: &v1.Secret{}}, handler.EnqueueRequestsFromMapFunc(newRequestsFromMatchedLabels())); err != nil { + if err := c.Watch( + source.Kind(mgr.GetCache(), &v1.Secret{}), + handler.EnqueueRequestsFromMapFunc(newRequestsFromMatchedLabels()), + ); err != nil { return err } // Dynamically watches the certificate authorities involved in a cluster relationship - if err := c.Watch(&source.Kind{Type: &v1.Secret{}}, r.watches.Secrets); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &v1.Secret{}), r.watches.Secrets); err != nil { return err } return r.watches.Secrets.AddHandlers( &watches.OwnerWatch{ - EnqueueRequestForOwner: handler.EnqueueRequestForOwner{ - IsController: true, - OwnerType: &esv1.Elasticsearch{}, - }, + Scheme: mgr.GetScheme(), + Mapper: mgr.GetRESTMapper(), + OwnerType: &esv1.Elasticsearch{}, + IsController: true, }, ) } @@ -53,7 +58,7 @@ func AddWatches(c controller.Controller, r *ReconcileRemoteCa) error { // newRequestsFromMatchedLabels creates a watch handler function that creates reconcile requests based on the // labels set on a Secret which contains the remote CA. func newRequestsFromMatchedLabels() handler.MapFunc { - return func(obj client.Object) []reconcile.Request { + return func(ctx context.Context, obj client.Object) []reconcile.Request { labels := obj.GetLabels() if !maps.ContainsKeys(labels, RemoteClusterNameLabelName, RemoteClusterNamespaceLabelName, commonv1.TypeLabelName) { return nil diff --git a/pkg/controller/stackconfigpolicy/controller.go b/pkg/controller/stackconfigpolicy/controller.go index 1fd4ca30e8..a1cfaf3f58 100644 --- a/pkg/controller/stackconfigpolicy/controller.go +++ b/pkg/controller/stackconfigpolicy/controller.go @@ -59,7 +59,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error { if err != nil { return err } - return addWatches(c, r) + return addWatches(mgr, c, r) } // newReconciler returns a new reconcile.Reconciler of StackConfigPolicy. @@ -74,23 +74,23 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileSt } } -func addWatches(c controller.Controller, r *ReconcileStackConfigPolicy) error { +func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileStackConfigPolicy) error { // watch for changes to StackConfigPolicy - if err := c.Watch(&source.Kind{Type: &policyv1alpha1.StackConfigPolicy{}}, &handler.EnqueueRequestForObject{}); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &policyv1alpha1.StackConfigPolicy{}), &handler.EnqueueRequestForObject{}); err != nil { return err } // watch for changes to Elasticsearch and reconcile all StackConfigPolicy - if err := c.Watch(&source.Kind{Type: &esv1.Elasticsearch{}}, r.reconcileRequestForAllPolicies()); err != nil { + if err := c.Watch(source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}), r.reconcileRequestForAllPolicies()); err != nil { return err } // watch Secrets soft owned by StackConfigPolicy - return c.Watch(&source.Kind{Type: &corev1.Secret{}}, reconcileRequestForSoftOwnerPolicy()) + return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), reconcileRequestForSoftOwnerPolicy()) } func reconcileRequestForSoftOwnerPolicy() handler.EventHandler { - return handler.EnqueueRequestsFromMapFunc(func(object client.Object) []reconcile.Request { + return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, object client.Object) []reconcile.Request { softOwner, referenced := reconciler.SoftOwnerRefFromLabels(object.GetLabels()) if !referenced || softOwner.Kind != policyv1alpha1.Kind { return nil @@ -103,7 +103,7 @@ func reconcileRequestForSoftOwnerPolicy() handler.EventHandler { // requestsAllStackConfigPolicies returns the requests to reconcile all StackConfigPolicy resources. func (r *ReconcileStackConfigPolicy) reconcileRequestForAllPolicies() handler.EventHandler { - return handler.EnqueueRequestsFromMapFunc(func(es client.Object) []reconcile.Request { + return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, es client.Object) []reconcile.Request { var stackConfigList policyv1alpha1.StackConfigPolicyList err := r.Client.List(context.Background(), &stackConfigList) if err != nil { diff --git a/pkg/controller/webhook/webhook_certificates_controller.go b/pkg/controller/webhook/webhook_certificates_controller.go index d453b56887..effaa1865a 100644 --- a/pkg/controller/webhook/webhook_certificates_controller.go +++ b/pkg/controller/webhook/webhook_certificates_controller.go @@ -112,7 +112,7 @@ func Add(mgr manager.Manager, webhookParams Params, clientset kubernetes.Interfa Name: webhookParams.SecretName, } - if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &watches.NamedWatch{ + if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), &watches.NamedWatch{ Name: "webhook-server-cert", Watched: []types.NamespacedName{secret}, Watcher: secret, @@ -124,7 +124,7 @@ func Add(mgr manager.Manager, webhookParams Params, clientset kubernetes.Interfa Name: webhookParams.Name, } - return c.Watch(&source.Kind{Type: webhook.getType()}, &watches.NamedWatch{ + return c.Watch(source.Kind(mgr.GetCache(), webhook.getType()), &watches.NamedWatch{ Name: "validatingwebhookconfiguration", Watched: []types.NamespacedName{webhookConfiguration}, Watcher: webhookConfiguration, diff --git a/pkg/utils/k8s/fake.go b/pkg/utils/k8s/fake.go index 10e466ac7a..bfe97b7c7a 100644 --- a/pkg/utils/k8s/fake.go +++ b/pkg/utils/k8s/fake.go @@ -9,6 +9,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -28,22 +29,30 @@ type failingSubClient struct { err error } -func (fc failingSubClient) Create(ctx context.Context, obj client.Object, subResource client.Object, opts ...client.SubResourceCreateOption) error { +func (fc failingSubClient) Create(_ context.Context, _ client.Object, subResource client.Object, _ ...client.SubResourceCreateOption) error { return fc.err } -func (fc failingSubClient) Get(ctx context.Context, obj client.Object, subResource client.Object, opts ...client.SubResourceGetOption) error { +func (fc failingSubClient) Get(_ context.Context, _ client.Object, subResource client.Object, _ ...client.SubResourceGetOption) error { return fc.err } -func (fc failingSubClient) Update(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error { +func (fc failingSubClient) Update(_ context.Context, _ client.Object, _ ...client.SubResourceUpdateOption) error { return fc.err } -func (fc failingSubClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error { +func (fc failingSubClient) Patch(_ context.Context, _ client.Object, patch client.Patch, _ ...client.SubResourcePatchOption) error { return fc.err } +func (fc failingSubClient) GroupVersionKindFor(obj runtime.Object) (schema.GroupVersionKind, error) { + return schema.GroupVersionKind{}, fc.err +} + +func (fc failingSubClient) IsObjectNamespaced(obj runtime.Object) (bool, error) { + return false, fc.err +} + type failingClient struct { failingSubClient err error @@ -54,31 +63,31 @@ func NewFailingClient(err error) Client { return failingClient{err: err} } -func (fc failingClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object, opts ...client.GetOption) error { +func (fc failingClient) Get(_ context.Context, key client.ObjectKey, _ client.Object, _ ...client.GetOption) error { return fc.err } -func (fc failingClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error { +func (fc failingClient) List(_ context.Context, list client.ObjectList, _ ...client.ListOption) error { return fc.err } -func (fc failingClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error { +func (fc failingClient) Create(_ context.Context, _ client.Object, _ ...client.CreateOption) error { return fc.err } -func (fc failingClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error { +func (fc failingClient) Delete(_ context.Context, _ client.Object, _ ...client.DeleteOption) error { return fc.err } -func (fc failingClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error { +func (fc failingClient) Update(_ context.Context, _ client.Object, _ ...client.UpdateOption) error { return fc.err } -func (fc failingClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error { +func (fc failingClient) Patch(_ context.Context, _ client.Object, patch client.Patch, _ ...client.PatchOption) error { return fc.err } -func (fc failingClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { +func (fc failingClient) DeleteAllOf(_ context.Context, _ client.Object, _ ...client.DeleteAllOfOption) error { return fc.err }