Skip to content

Commit

Permalink
Refactor: Extract RabbitMQ management client & provide fakes (#89)
Browse files Browse the repository at this point in the history
* Add factory to create rabbitmq clients

Allows for fakes to be used in integration tests to mock out the
rabbitmq server being referenced by the controller

* Add integration tests for generating management client from RMQCluster

Also implement the ClientFactory in the controllers
  • Loading branch information
coro authored Mar 30, 2021
1 parent 5e2b85b commit aed2478
Show file tree
Hide file tree
Showing 17 changed files with 1,714 additions and 190 deletions.
24 changes: 13 additions & 11 deletions controllers/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@ import (
"context"
"encoding/json"
"errors"
"reflect"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -36,9 +37,10 @@ const bindingFinalizer = "deletion.finalizers.bindings.rabbitmq.com"
// BindingReconciler reconciles a Binding object
type BindingReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=bindings,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -52,12 +54,12 @@ func (r *BindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, binding.Spec.RabbitmqClusterReference, binding.Namespace)
rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, binding.Spec.RabbitmqClusterReference, binding.Namespace)

if errors.Is(err, NoSuchRabbitmqClusterError) && binding.ObjectMeta.DeletionTimestamp.IsZero() {
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && binding.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -101,7 +103,7 @@ func (r *BindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

func (r *BindingReconciler) declareBinding(ctx context.Context, client *rabbithole.Client, binding *topology.Binding) error {
func (r *BindingReconciler) declareBinding(ctx context.Context, client internal.RabbitMQClient, binding *topology.Binding) error {
logger := ctrl.LoggerFrom(ctx)

info, err := internal.GenerateBindingInfo(binding)
Expand All @@ -128,7 +130,7 @@ func (r *BindingReconciler) declareBinding(ctx context.Context, client *rabbitho
// when server responds with '404' Not Found, it logs and does not requeue on error
// if no binding argument is set, generating properties key by using internal.GeneratePropertiesKey
// if binding arguments are set, list all bindings between source/destination to find the binding; if it failed to find corresponding binding, it assumes that the binding is already deleted and returns no error
func (r *BindingReconciler) deleteBinding(ctx context.Context, client *rabbithole.Client, binding *topology.Binding) error {
func (r *BindingReconciler) deleteBinding(ctx context.Context, client internal.RabbitMQClient, binding *topology.Binding) error {
logger := ctrl.LoggerFrom(ctx)

var info *rabbithole.BindingInfo
Expand Down Expand Up @@ -167,7 +169,7 @@ func (r *BindingReconciler) deleteBinding(ctx context.Context, client *rabbithol
return r.removeFinalizer(ctx, binding)
}

func (r *BindingReconciler) findBindingInfo(logger logr.Logger, binding *topology.Binding, client *rabbithole.Client) (*rabbithole.BindingInfo, error) {
func (r *BindingReconciler) findBindingInfo(logger logr.Logger, binding *topology.Binding, client internal.RabbitMQClient) (*rabbithole.BindingInfo, error) {
logger.Info("binding arguments set; listing bindings from server to complete deletion")
arguments := make(map[string]interface{})
if binding.Spec.Arguments != nil {
Expand Down
18 changes: 9 additions & 9 deletions controllers/exchange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"errors"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -36,9 +35,10 @@ const exchangeFinalizer = "deletion.finalizers.exchanges.rabbitmq.com"
// ExchangeReconciler reconciles a Exchange object
type ExchangeReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=exchanges,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -52,13 +52,13 @@ func (r *ExchangeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, exchange.Spec.RabbitmqClusterReference, exchange.Namespace)
rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, exchange.Spec.RabbitmqClusterReference, exchange.Namespace)
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && exchange.ObjectMeta.DeletionTimestamp.IsZero() {
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && exchange.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -103,7 +103,7 @@ func (r *ExchangeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

func (r *ExchangeReconciler) declareExchange(ctx context.Context, client *rabbithole.Client, exchange *topology.Exchange) error {
func (r *ExchangeReconciler) declareExchange(ctx context.Context, client internal.RabbitMQClient, exchange *topology.Exchange) error {
logger := ctrl.LoggerFrom(ctx)

settings, err := internal.GenerateExchangeSettings(exchange)
Expand Down Expand Up @@ -139,7 +139,7 @@ func (r *ExchangeReconciler) addFinalizerIfNeeded(ctx context.Context, e *topolo

// deletes exchange from rabbitmq server
// if server responds with '404' Not Found, it logs and does not requeue on error
func (r *ExchangeReconciler) deleteExchange(ctx context.Context, client *rabbithole.Client, exchange *topology.Exchange) error {
func (r *ExchangeReconciler) deleteExchange(ctx context.Context, client internal.RabbitMQClient, exchange *topology.Exchange) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
Expand Down
21 changes: 11 additions & 10 deletions controllers/permission_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"context"
"encoding/json"
"errors"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"time"

"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -26,9 +26,10 @@ const permissionFinalizer = "deletion.finalizers.permissions.rabbitmq.com"
// PermissionReconciler reconciles a Permission object
type PermissionReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=permissions,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -42,13 +43,13 @@ func (r *PermissionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, permission.Spec.RabbitmqClusterReference, permission.Namespace)
rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, permission.Spec.RabbitmqClusterReference, permission.Namespace)
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && permission.ObjectMeta.DeletionTimestamp.IsZero() {
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && permission.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -93,7 +94,7 @@ func (r *PermissionReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

func (r *PermissionReconciler) updatePermissions(ctx context.Context, client *rabbithole.Client, permission *topology.Permission) error {
func (r *PermissionReconciler) updatePermissions(ctx context.Context, client internal.RabbitMQClient, permission *topology.Permission) error {
logger := ctrl.LoggerFrom(ctx)

if err := validateResponse(client.UpdatePermissionsIn(permission.Spec.Vhost, permission.Spec.User, internal.GeneratePermissions(permission))); err != nil {
Expand All @@ -118,7 +119,7 @@ func (r *PermissionReconciler) addFinalizerIfNeeded(ctx context.Context, permiss
return nil
}

func (r *PermissionReconciler) revokePermissions(ctx context.Context, client *rabbithole.Client, permission *topology.Permission) error {
func (r *PermissionReconciler) revokePermissions(ctx context.Context, client internal.RabbitMQClient, permission *topology.Permission) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
Expand Down
18 changes: 9 additions & 9 deletions controllers/policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"errors"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -36,9 +35,10 @@ const policyFinalizer = "deletion.finalizers.policies.rabbitmq.com"
// PolicyReconciler reconciles a Policy object
type PolicyReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=policies,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -53,13 +53,13 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, policy.Spec.RabbitmqClusterReference, policy.Namespace)
rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, policy.Spec.RabbitmqClusterReference, policy.Namespace)
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && policy.ObjectMeta.DeletionTimestamp.IsZero() {
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && policy.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

// creates or updates a given policy using rabbithole client.PutPolicy
func (r *PolicyReconciler) putPolicy(ctx context.Context, client *rabbithole.Client, policy *topology.Policy) error {
func (r *PolicyReconciler) putPolicy(ctx context.Context, client internal.RabbitMQClient, policy *topology.Policy) error {
logger := ctrl.LoggerFrom(ctx)

generatePolicy, err := internal.GeneratePolicy(policy)
Expand Down Expand Up @@ -139,7 +139,7 @@ func (r *PolicyReconciler) addFinalizerIfNeeded(ctx context.Context, policy *top

// deletes policy from rabbitmq server
// if server responds with '404' Not Found, it logs and does not requeue on error
func (r *PolicyReconciler) deletePolicy(ctx context.Context, client *rabbithole.Client, policy *topology.Policy) error {
func (r *PolicyReconciler) deletePolicy(ctx context.Context, client internal.RabbitMQClient, policy *topology.Policy) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
Expand Down
18 changes: 9 additions & 9 deletions controllers/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"time"

"github.com/go-logr/logr"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
Expand All @@ -34,9 +33,10 @@ const deletionFinalizer = "deletion.finalizers.queues.rabbitmq.com"
// QueueReconciler reconciles a RabbitMQ Queue
type QueueReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=queues,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -57,13 +57,13 @@ func (r *QueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

// create rabbitmq http rabbitClient
rabbitClient, err := rabbitholeClient(ctx, r.Client, q.Spec.RabbitmqClusterReference, q.Namespace)
rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, q.Spec.RabbitmqClusterReference, q.Namespace)
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && q.ObjectMeta.DeletionTimestamp.IsZero() {
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && q.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -109,7 +109,7 @@ func (r *QueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
return ctrl.Result{}, nil
}

func (r *QueueReconciler) declareQueue(ctx context.Context, client *rabbithole.Client, q *topology.Queue) error {
func (r *QueueReconciler) declareQueue(ctx context.Context, client internal.RabbitMQClient, q *topology.Queue) error {
logger := ctrl.LoggerFrom(ctx)

queueSettings, err := internal.GenerateQueueSettings(q)
Expand Down Expand Up @@ -146,7 +146,7 @@ func (r *QueueReconciler) addFinalizerIfNeeded(ctx context.Context, q *topology.
// deletes queue from rabbitmq server
// if server responds with '404' Not Found, it logs and does not requeue on error
// queues could be deleted manually or gone because of AutoDelete
func (r *QueueReconciler) deleteQueue(ctx context.Context, client *rabbithole.Client, q *topology.Queue) error {
func (r *QueueReconciler) deleteQueue(ctx context.Context, client internal.RabbitMQClient, q *topology.Queue) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
Expand Down
18 changes: 9 additions & 9 deletions controllers/user_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"time"

"github.com/go-logr/logr"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
topology "github.com/rabbitmq/messaging-topology-operator/api/v1alpha2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
Expand All @@ -43,9 +42,10 @@ const (
// UserReconciler reconciles a User object
type UserReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RabbitmqClientFactory internal.RabbitMQClientFactory
}

// +kubebuilder:rbac:groups=rabbitmq.com,resources=users,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -60,13 +60,13 @@ func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
return reconcile.Result{}, client.IgnoreNotFound(err)
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, user.Spec.RabbitmqClusterReference, user.Namespace)
rabbitClient, err := r.RabbitmqClientFactory(ctx, r.Client, user.Spec.RabbitmqClusterReference, user.Namespace)
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && user.ObjectMeta.DeletionTimestamp.IsZero() {
if errors.Is(err, internal.NoSuchRabbitmqClusterError) && user.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
} else if err != nil && !errors.Is(err, internal.NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func (r *UserReconciler) setUserStatus(ctx context.Context, user *topology.User)
return nil
}

func (r *UserReconciler) declareUser(ctx context.Context, client *rabbithole.Client, user *topology.User) error {
func (r *UserReconciler) declareUser(ctx context.Context, client internal.RabbitMQClient, user *topology.User) error {
logger := ctrl.LoggerFrom(ctx)

credentials, err := r.getUserCredentials(ctx, user)
Expand Down Expand Up @@ -298,7 +298,7 @@ func (r *UserReconciler) getUserCredentials(ctx context.Context, user *topology.
return credentials, nil
}

func (r *UserReconciler) deleteUser(ctx context.Context, client *rabbithole.Client, user *topology.User) error {
func (r *UserReconciler) deleteUser(ctx context.Context, client internal.RabbitMQClient, user *topology.User) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
Expand Down
Loading

0 comments on commit aed2478

Please sign in to comment.