Skip to content

Commit

Permalink
refactoring watches
Browse files Browse the repository at this point in the history
  • Loading branch information
thbkrkr committed May 30, 2023
1 parent 3a509ca commit 02d3312
Show file tree
Hide file tree
Showing 28 changed files with 364 additions and 320 deletions.
51 changes: 30 additions & 21 deletions pkg/controller/agent/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error {
if err != nil {
return err
}
return addWatches(c, r)
return addWatches(mgr, c, r)
}

// newReconciler returns a new reconcile.Reconciler.
Expand All @@ -58,53 +58,62 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileAg
}

// addWatches adds watches for all resources this controller cares about
func addWatches(c controller.Controller, r *ReconcileAgent) error {
func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileAgent) error {
// Watch for changes to Agent
if err := c.Watch(&source.Kind{Type: &agentv1alpha1.Agent{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(
source.Kind(mgr.GetCache(), &agentv1alpha1.Agent{}),
&handler.EnqueueRequestForObject{}); err != nil {
return err
}

// Watch DaemonSets
if err := c.Watch(&source.Kind{Type: &appsv1.DaemonSet{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &agentv1alpha1.Agent{},
}); err != nil {
if err := c.Watch(
source.Kind(mgr.GetCache(), &appsv1.DaemonSet{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(),
&agentv1alpha1.Agent{}, handler.OnlyControllerOwner()),
); err != nil {
return err
}

// Watch Deployments
if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &agentv1alpha1.Agent{},
}); err != nil {
if err := c.Watch(
source.Kind(mgr.GetCache(), &appsv1.Deployment{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(),
&agentv1alpha1.Agent{}, handler.OnlyControllerOwner()),
); err != nil {
return err
}

// Watch Pods, to ensure `status.version` is correctly reconciled on any change.
// Watching Deployments or DaemonSets only may lead to missing some events.
if err := watches.WatchPods(c, NameLabelName); err != nil {
if err := watches.WatchPods(mgr, c, NameLabelName); err != nil {
return err
}

// Watch Secrets
if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &agentv1alpha1.Agent{},
}); err != nil {
if err := c.Watch(
source.Kind(mgr.GetCache(), &corev1.Secret{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(),
&agentv1alpha1.Agent{}, handler.OnlyControllerOwner()),
); err != nil {
return err
}

// Watch services - Agent in Fleet mode with Fleet Server enabled configures and exposes a Service
// for Elastic Agents to connect to.
if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &agentv1alpha1.Agent{},
}); err != nil {
if err := c.Watch(
source.Kind(mgr.GetCache(), &corev1.Service{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(),
&agentv1alpha1.Agent{}, handler.OnlyControllerOwner()),
); err != nil {
return err
}

// Watch dynamically referenced Secrets
return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets)
return c.Watch(
source.Kind(mgr.GetCache(), &corev1.Secret{}),
r.dynamicWatches.Secrets,
)
}

var _ reconcile.Reconciler = &ReconcileAgent{}
Expand Down
36 changes: 18 additions & 18 deletions pkg/controller/apmserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error {
if err != nil {
return err
}
return addWatches(c, reconciler)
return addWatches(mgr, c, reconciler)
}

// newReconciler returns a new reconcile.Reconciler
Expand All @@ -96,48 +96,48 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileAp
}
}

func addWatches(c controller.Controller, r *ReconcileApmServer) error {
func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileApmServer) error {
// Watch for changes to ApmServer
err := c.Watch(&source.Kind{Type: &apmv1.ApmServer{}}, &handler.EnqueueRequestForObject{})
err := c.Watch(source.Kind(mgr.GetCache(), &apmv1.ApmServer{}), &handler.EnqueueRequestForObject{})
if err != nil {
return err
}

// Watch Deployments
if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &apmv1.ApmServer{},
}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestForOwner(
mgr.GetScheme(), mgr.GetRESTMapper(),
&apmv1.ApmServer{}, handler.OnlyControllerOwner(),
)); err != nil {
return err
}

// Watch Pods, to ensure `status.version` and version upgrades are correctly reconciled on any change.
// Watching Deployments only may lead to missing some events.
if err := watches.WatchPods(c, ApmServerNameLabelName); err != nil {
if err := watches.WatchPods(mgr, c, ApmServerNameLabelName); err != nil {
return err
}

// Watch services
if err := c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &apmv1.ApmServer{},
}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), handler.EnqueueRequestForOwner(
mgr.GetScheme(), mgr.GetRESTMapper(),
&apmv1.ApmServer{}, handler.OnlyControllerOwner(),
)); err != nil {
return err
}

// Watch owned and soft-owned secrets
if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &apmv1.ApmServer{},
}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner(
mgr.GetScheme(), mgr.GetRESTMapper(),
&apmv1.ApmServer{}, handler.OnlyControllerOwner(),
)); err != nil {
return err
}
if err := watches.WatchSoftOwnedSecrets(c, apmv1.Kind); err != nil {
if err := watches.WatchSoftOwnedSecrets(mgr, c, apmv1.Kind); err != nil {
return err
}

// dynamically watch referenced secrets to connect to Elasticsearch
return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets)
return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets)
}

var _ reconcile.Reconciler = &ReconcileApmServer{}
Expand Down
20 changes: 10 additions & 10 deletions pkg/controller/association/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,33 +37,33 @@ func AddAssociationController(
if err != nil {
return err
}
return addWatches(c, r)
return addWatches(mgr, c, r)
}

func addWatches(c controller.Controller, r *Reconciler) error {
func addWatches(mgr manager.Manager, c controller.Controller, r *Reconciler) error {
// Watch the associated resource (e.g. Kibana for a Kibana -> Elasticsearch association)
if err := c.Watch(&source.Kind{Type: r.AssociatedObjTemplate()}, &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), r.AssociatedObjTemplate()), &handler.EnqueueRequestForObject{}); err != nil {
return err
}

// Watch Secrets owned by the associated resource
if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{
OwnerType: r.AssociatedObjTemplate(),
IsController: true,
}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner(
mgr.GetScheme(), mgr.GetRESTMapper(),
r.AssociatedObjTemplate(), handler.OnlyControllerOwner(),
)); err != nil {
return err
}

// Dynamically watch the referenced resources (e.g. Elasticsearch B for a Kibana A -> Elasticsearch B association)
if err := c.Watch(&source.Kind{Type: r.ReferencedObjTemplate()}, r.watches.ReferencedResources); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), r.ReferencedObjTemplate()), r.watches.ReferencedResources); err != nil {
return err
}

// Dynamically watch Secrets (CA Secret of the referenced resource, ES user secret or custom referenced object secret)
if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.watches.Secrets); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.watches.Secrets); err != nil {
return err
}

// Dynamically watch Service objects for custom services setup by the user
return c.Watch(&source.Kind{Type: &corev1.Service{}}, r.watches.Services)
return c.Watch(source.Kind(mgr.GetCache(), &corev1.Service{}), r.watches.Services)
}
6 changes: 3 additions & 3 deletions pkg/controller/autoscaling/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func Add(mgr manager.Manager, p operator.Parameters) error {
}

// The deprecated/legacy controller watches for changes on Elasticsearch clusters.
if err := legacyController.Watch(&source.Kind{Type: &esv1.Elasticsearch{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err := legacyController.Watch(source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}), &handler.EnqueueRequestForObject{}); err != nil {
return err
}

Expand All @@ -36,8 +36,8 @@ func Add(mgr manager.Manager, p operator.Parameters) error {
if err != nil {
return err
}
if err := controller.Watch(&source.Kind{Type: &v1alpha1.ElasticsearchAutoscaler{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err := controller.Watch(source.Kind(mgr.GetCache(), &v1alpha1.ElasticsearchAutoscaler{}), &handler.EnqueueRequestForObject{}); err != nil {
return err
}
return controller.Watch(&source.Kind{Type: &esv1.Elasticsearch{}}, reconciler.Watches.ReferencedResources)
return controller.Watch(source.Kind(mgr.GetCache(), &esv1.Elasticsearch{}), reconciler.Watches.ReferencedResources)
}
36 changes: 18 additions & 18 deletions pkg/controller/beat/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func Add(mgr manager.Manager, params operator.Parameters) error {
if err != nil {
return err
}
return addWatches(c, r)
return addWatches(mgr, c, r)
}

// newReconciler returns a new reconcile.Reconciler.
Expand All @@ -67,47 +67,47 @@ func newReconciler(mgr manager.Manager, params operator.Parameters) *ReconcileBe
}

// addWatches adds watches for all resources this controller cares about
func addWatches(c controller.Controller, r *ReconcileBeat) error {
func addWatches(mgr manager.Manager, c controller.Controller, r *ReconcileBeat) error {
// Watch for changes to Beat
if err := c.Watch(&source.Kind{Type: &beatv1beta1.Beat{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &beatv1beta1.Beat{}), &handler.EnqueueRequestForObject{}); err != nil {
return err
}

// Watch DaemonSets
if err := c.Watch(&source.Kind{Type: &appsv1.DaemonSet{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &beatv1beta1.Beat{},
}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.DaemonSet{}), handler.EnqueueRequestForOwner(
mgr.GetScheme(), mgr.GetRESTMapper(),
&beatv1beta1.Beat{}, handler.OnlyControllerOwner(),
)); err != nil {
return err
}

// Watch Deployments
if err := c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &beatv1beta1.Beat{},
}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.Deployment{}), handler.EnqueueRequestForOwner(
mgr.GetScheme(), mgr.GetRESTMapper(),
&beatv1beta1.Beat{}, handler.OnlyControllerOwner(),
)); err != nil {
return err
}

// Watch Pods, to ensure `status.version` is correctly reconciled on any change.
// Watching Deployments or DaemonSets only may lead to missing some events.
if err := watches.WatchPods(c, beatcommon.NameLabelName); err != nil {
if err := watches.WatchPods(mgr, c, beatcommon.NameLabelName); err != nil {
return err
}

// Watch owned and soft-owned Secrets
if err := c.Watch(&source.Kind{Type: &corev1.Secret{}}, &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &beatv1beta1.Beat{},
}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), handler.EnqueueRequestForOwner(
mgr.GetScheme(), mgr.GetRESTMapper(),
&beatv1beta1.Beat{}, handler.OnlyControllerOwner(),
)); err != nil {
return err
}
if err := watches.WatchSoftOwnedSecrets(c, beatv1beta1.Kind); err != nil {
if err := watches.WatchSoftOwnedSecrets(mgr, c, beatv1beta1.Kind); err != nil {
return err
}

// Watch dynamically referenced Secrets
return c.Watch(&source.Kind{Type: &corev1.Secret{}}, r.dynamicWatches.Secrets)
return c.Watch(source.Kind(mgr.GetCache(), &corev1.Secret{}), r.dynamicWatches.Secrets)
}

var _ reconcile.Reconciler = &ReconcileBeat{}
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/common/reconciler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
package reconciler

import (
"context"

"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand All @@ -14,7 +16,7 @@ import (
// GenericEventHandler returns an EventHandler that enqueues a reconciliation request
// from the generic event NamespacedName.
func GenericEventHandler() handler.EventHandler {
return handler.EnqueueRequestsFromMapFunc(func(obj client.Object) []reconcile.Request {
return handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
return []reconcile.Request{
{
NamespacedName: types.NamespacedName{Namespace: obj.GetNamespace(), Name: obj.GetName()},
Expand Down
Loading

0 comments on commit 02d3312

Please sign in to comment.