Skip to content

Commit

Permalink
Allow topology objects to be deleted if cluster deleted (#52)
Browse files Browse the repository at this point in the history
- This prevents the finalizer step from failing if the cluster is already
down.

- Extract error message when RabbitmqCluster is gone during
an object deletion

Co-authored-by: Chunyi Lyu <[email protected]>
  • Loading branch information
coro and ChunyiLyu authored Mar 15, 2021
1 parent 8101ef4 commit 5d08beb
Show file tree
Hide file tree
Showing 11 changed files with 567 additions and 37 deletions.
1 change: 1 addition & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ const (
failedStatusUpdate = "Failed to update object status"
failedMarshalSpec = "Failed to marshal spec"
failedGenerateRabbitClient = "Failed to generate http rabbitClient"
noSuchRabbitDeletion = "RabbitmqCluster is already gone: cannot find its connection secret"
)
14 changes: 13 additions & 1 deletion controllers/exchange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"context"
"encoding/json"
"errors"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -51,7 +53,12 @@ func (r *ExchangeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, exchange.Spec.RabbitmqClusterReference)
if err != nil {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && exchange.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -135,6 +142,11 @@ func (r *ExchangeReconciler) addFinalizerIfNeeded(ctx context.Context, e *topolo
func (r *ExchangeReconciler) deleteExchange(ctx context.Context, client *rabbithole.Client, exchange *topologyv1alpha1.Exchange) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info(noSuchRabbitDeletion, "exchange", exchange.Name)
return r.removeFinalizer(ctx, exchange)
}

err := validateResponseForDeletion(client.DeleteExchange(exchange.Spec.Vhost, exchange.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find exchange in rabbitmq server; already deleted", "exchange", exchange.Spec.Name)
Expand Down
15 changes: 14 additions & 1 deletion controllers/policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"context"
"encoding/json"
"errors"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -52,8 +54,14 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, policy.Spec.RabbitmqClusterReference)
if err != nil {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && policy.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}

if !policy.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -134,6 +142,11 @@ func (r *PolicyReconciler) addFinalizerIfNeeded(ctx context.Context, policy *top
func (r *PolicyReconciler) deletePolicy(ctx context.Context, client *rabbithole.Client, policy *topologyv1alpha1.Policy) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info(noSuchRabbitDeletion, "policy", policy.Name)
return r.removeFinalizer(ctx, policy)
}

err := validateResponseForDeletion(client.DeletePolicy(policy.Spec.Vhost, policy.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find policy in rabbitmq server; already deleted", "policy", policy.Spec.Name)
Expand Down
14 changes: 13 additions & 1 deletion controllers/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"context"
"encoding/json"
"errors"
"time"

"github.com/go-logr/logr"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
Expand Down Expand Up @@ -56,7 +58,12 @@ func (r *QueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl

// create rabbitmq http rabbitClient
rabbitClient, err := rabbitholeClient(ctx, r.Client, q.Spec.RabbitmqClusterReference)
if err != nil {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && q.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -142,6 +149,11 @@ func (r *QueueReconciler) addFinalizerIfNeeded(ctx context.Context, q *topologyv
func (r *QueueReconciler) deleteQueue(ctx context.Context, client *rabbithole.Client, q *topologyv1alpha1.Queue) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info(noSuchRabbitDeletion, "queue", q.Name)
return r.removeFinalizer(ctx, q)
}

err := validateResponseForDeletion(client.DeleteQueue(q.Spec.Vhost, q.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find queue in rabbitmq server; already deleted", "queue", q.Spec.Name)
Expand Down
13 changes: 12 additions & 1 deletion controllers/user_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
Expand Down Expand Up @@ -60,7 +61,12 @@ func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, user.Spec.RabbitmqClusterReference)
if err != nil {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && user.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -298,6 +304,11 @@ func (r *UserReconciler) getUserCredentials(ctx context.Context, user *topologyv
func (r *UserReconciler) deleteUser(ctx context.Context, client *rabbithole.Client, user *topologyv1alpha1.User) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info(noSuchRabbitDeletion, "user", user.Name)
return r.removeFinalizer(ctx, user)
}

credentials, err := r.getUserCredentials(ctx, user)
if err != nil {
msg := "failed to retrieve user credentials secret from status"
Expand Down
14 changes: 12 additions & 2 deletions controllers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
"k8s.io/apimachinery/pkg/types"
)

var NoSuchRabbitmqClusterError = errors.New("RabbitmqCluster object does not exist")

// returns a http client for the given RabbitmqCluster
// assumes the RabbitmqCluster is reachable using its service's ClusterIP
func rabbitholeClient(ctx context.Context, c client.Client, rmq v1alpha1.RabbitmqClusterReference) (*rabbithole.Client, error) {
svc, secret, err := serviceSecretFromReference(ctx, c, rmq)
if err != nil {
return nil, fmt.Errorf("failed to get service or secret object from specified rabbitmqcluster: %v", err)
return nil, fmt.Errorf("failed to get service or secret object from specified rabbitmqcluster: %w", err)
}

ip := net.ParseIP(svc.Spec.ClusterIP)
Expand Down Expand Up @@ -79,9 +81,17 @@ func managementPort(svc *corev1.Service) (int, error) {
return 0, fmt.Errorf("failed to find 'management' or 'management-tls' from service %s", svc.Name)
}

func serviceSecretFromReference(ctx context.Context, c client.Client, rmq v1alpha1.RabbitmqClusterReference) (*corev1.Service, *corev1.Secret, error) {
func rabbitmqClusterFromReference(ctx context.Context, c client.Client, rmq v1alpha1.RabbitmqClusterReference) (*rabbitmqv1beta1.RabbitmqCluster, error) {
cluster := &rabbitmqv1beta1.RabbitmqCluster{}
if err := c.Get(ctx, types.NamespacedName{Name: rmq.Name, Namespace: rmq.Namespace}, cluster); err != nil {
return nil, fmt.Errorf("Failed to get cluster from reference: %s Error: %w", err, NoSuchRabbitmqClusterError)
}
return cluster, nil
}

func serviceSecretFromReference(ctx context.Context, c client.Client, rmq v1alpha1.RabbitmqClusterReference) (*corev1.Service, *corev1.Secret, error) {
cluster, err := rabbitmqClusterFromReference(ctx, c, rmq)
if err != nil {
return nil, nil, err
}

Expand Down
14 changes: 13 additions & 1 deletion controllers/vhost_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"errors"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -42,7 +44,12 @@ func (r *VhostReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, vhost.Spec.RabbitmqClusterReference)
if err != nil {
// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && vhost.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -120,6 +127,11 @@ func (r *VhostReconciler) addFinalizerIfNeeded(ctx context.Context, vhost *topol
func (r *VhostReconciler) deleteVhost(ctx context.Context, client *rabbithole.Client, vhost *topologyv1alpha1.Vhost) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info(noSuchRabbitDeletion, "vhost", vhost.Name)
return r.removeFinalizer(ctx, vhost)
}

err := validateResponseForDeletion(client.DeleteVhost(vhost.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find vhost in rabbitmq server; already deleted", "vhost", vhost.Spec.Name)
Expand Down
Loading

0 comments on commit 5d08beb

Please sign in to comment.