Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow topology objects to be deleted if cluster deleted #52

Merged
merged 6 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion controllers/exchange_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"context"
"encoding/json"
"errors"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -50,7 +52,13 @@ func (r *ExchangeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, exchange.Spec.RabbitmqClusterReference)
if err != nil {

// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && exchange.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, "Failed to generate http rabbitClient")
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -120,6 +128,11 @@ func (r *ExchangeReconciler) addFinalizerIfNeeded(ctx context.Context, e *topolo
func (r *ExchangeReconciler) deleteExchange(ctx context.Context, client *rabbithole.Client, exchange *topologyv1alpha1.Exchange) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info("Not deleting exchange as RabbitmqCluster is already deleted or down", "exchange", exchange.Name)
return r.removeFinalizer(ctx, exchange)
}

err := validateResponseForDeletion(client.DeleteExchange(exchange.Spec.Vhost, exchange.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find exchange in rabbitmq server; already deleted", "exchange", exchange.Spec.Name)
Expand Down
18 changes: 16 additions & 2 deletions controllers/policy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"context"
"encoding/json"
"errors"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -51,8 +53,15 @@ func (r *PolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, policy.Spec.RabbitmqClusterReference)
if err != nil {
logger.Error(err, "Failed to generate http rabbit client")

// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && policy.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, "Failed to generate http rabbitClient")
return reconcile.Result{}, err
}

if !policy.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -119,6 +128,11 @@ func (r *PolicyReconciler) addFinalizerIfNeeded(ctx context.Context, policy *top
func (r *PolicyReconciler) deletePolicy(ctx context.Context, client *rabbithole.Client, policy *topologyv1alpha1.Policy) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info("Not deleting policy as RabbitmqCluster is already deleted or down", "policy", policy.Name)
return r.removeFinalizer(ctx, policy)
}

err := validateResponseForDeletion(client.DeletePolicy(policy.Spec.Vhost, policy.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find policy in rabbitmq server; already deleted", "policy", policy.Spec.Name)
Expand Down
15 changes: 14 additions & 1 deletion controllers/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"context"
"encoding/json"
"errors"
"time"

"github.com/go-logr/logr"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
Expand Down Expand Up @@ -55,7 +57,13 @@ func (r *QueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl

// create rabbitmq http rabbitClient
rabbitClient, err := rabbitholeClient(ctx, r.Client, q.Spec.RabbitmqClusterReference)
if err != nil {

// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && q.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, "Failed to generate http rabbitClient")
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -127,6 +135,11 @@ func (r *QueueReconciler) addFinalizerIfNeeded(ctx context.Context, q *topologyv
func (r *QueueReconciler) deleteQueue(ctx context.Context, client *rabbithole.Client, q *topologyv1alpha1.Queue) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info("Not deleting queue as RabbitmqCluster is already deleted or down", "queue", q.Name)
ChunyiLyu marked this conversation as resolved.
Show resolved Hide resolved
return r.removeFinalizer(ctx, q)
}

err := validateResponseForDeletion(client.DeleteQueue(q.Spec.Vhost, q.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find queue in rabbitmq server; already deleted", "queue", q.Spec.Name)
Expand Down
14 changes: 13 additions & 1 deletion controllers/user_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/go-logr/logr"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
Expand Down Expand Up @@ -60,7 +61,13 @@ func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, user.Spec.RabbitmqClusterReference)
if err != nil {

// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && user.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, "Failed to generate http rabbitClient")
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -283,6 +290,11 @@ func (r *UserReconciler) getUserCredentials(ctx context.Context, user *topologyv
func (r *UserReconciler) deleteUser(ctx context.Context, client *rabbithole.Client, user *topologyv1alpha1.User) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info("Not deleting user as RabbitmqCluster is already deleted or down", "user", user.Name)
return r.removeFinalizer(ctx, user)
}

credentials, err := r.getUserCredentials(ctx, user)
if err != nil {
msg := "failed to retrieve user credentials secret from status"
Expand Down
14 changes: 12 additions & 2 deletions controllers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ import (
"k8s.io/apimachinery/pkg/types"
)

var NoSuchRabbitmqClusterError = errors.New("RabbitmqCluster object does not exist")

// returns a http client for the given RabbitmqCluster
// assumes the RabbitmqCluster is reachable using its service's ClusterIP
func rabbitholeClient(ctx context.Context, c client.Client, rmq v1alpha1.RabbitmqClusterReference) (*rabbithole.Client, error) {
svc, secret, err := serviceSecretFromReference(ctx, c, rmq)
if err != nil {
return nil, fmt.Errorf("failed to get service or secret object from specified rabbitmqcluster: %v", err)
return nil, fmt.Errorf("failed to get service or secret object from specified rabbitmqcluster: %w", err)
}

ip := net.ParseIP(svc.Spec.ClusterIP)
Expand Down Expand Up @@ -79,9 +81,17 @@ func managementPort(svc *corev1.Service) (int, error) {
return 0, fmt.Errorf("failed to find 'management' or 'management-tls' from service %s", svc.Name)
}

func serviceSecretFromReference(ctx context.Context, c client.Client, rmq v1alpha1.RabbitmqClusterReference) (*corev1.Service, *corev1.Secret, error) {
func rabbitmqClusterFromReference(ctx context.Context, c client.Client, rmq v1alpha1.RabbitmqClusterReference) (*rabbitmqv1beta1.RabbitmqCluster, error) {
cluster := &rabbitmqv1beta1.RabbitmqCluster{}
if err := c.Get(ctx, types.NamespacedName{Name: rmq.Name, Namespace: rmq.Namespace}, cluster); err != nil {
return nil, fmt.Errorf("Failed to get cluster from reference: %s Error: %w", err, NoSuchRabbitmqClusterError)
}
return cluster, nil
}

func serviceSecretFromReference(ctx context.Context, c client.Client, rmq v1alpha1.RabbitmqClusterReference) (*corev1.Service, *corev1.Secret, error) {
cluster, err := rabbitmqClusterFromReference(ctx, c, rmq)
if err != nil {
return nil, nil, err
}

Expand Down
15 changes: 14 additions & 1 deletion controllers/vhost_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"errors"
"time"

rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -41,7 +43,13 @@ func (r *VhostReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, vhost.Spec.RabbitmqClusterReference)
if err != nil {

// If the object is not being deleted, but the RabbitmqCluster no longer exists, it could be that
// the Cluster is temporarily down. Requeue until it comes back up.
if errors.Is(err, NoSuchRabbitmqClusterError) && vhost.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existant cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, "Failed to generate http rabbitClient")
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -105,6 +113,11 @@ func (r *VhostReconciler) addFinalizerIfNeeded(ctx context.Context, vhost *topol
func (r *VhostReconciler) deleteVhost(ctx context.Context, client *rabbithole.Client, vhost *topologyv1alpha1.Vhost) error {
logger := ctrl.LoggerFrom(ctx)

if client == nil {
logger.Info("Not deleting vhost as RabbitmqCluster is already deleted or down", "vhost", vhost.Name)
return r.removeFinalizer(ctx, vhost)
}

err := validateResponseForDeletion(client.DeleteVhost(vhost.Spec.Name))
if errors.Is(err, NotFound) {
logger.Info("cannot find vhost in rabbitmq server; already deleted", "vhost", vhost.Spec.Name)
Expand Down
160 changes: 160 additions & 0 deletions system_tests/deletion_system_tests.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package system_tests

import (
"context"

rabbitmqv1beta1 "github.com/rabbitmq/cluster-operator/api/v1beta1"
topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

var _ = Describe("Deletion", func() {
var (
namespace = MustHaveEnv("NAMESPACE")
ctx = context.Background()
targetCluster *rabbitmqv1beta1.RabbitmqCluster
exchange topologyv1alpha1.Exchange
policy topologyv1alpha1.Policy
queue topologyv1alpha1.Queue
user topologyv1alpha1.User
vhost topologyv1alpha1.Vhost
)

BeforeEach(func() {
targetCluster = setupTestRabbitmqCluster("to-be-deleted", namespace)
targetClusterRef := topologyv1alpha1.RabbitmqClusterReference{Name: targetCluster.Name, Namespace: targetCluster.Namespace}
exchange = topologyv1alpha1.Exchange{
ObjectMeta: metav1.ObjectMeta{
Name: "exchange-deletion-test",
Namespace: namespace,
},
Spec: topologyv1alpha1.ExchangeSpec{
Name: "exchange-deletion-test",
RabbitmqClusterReference: targetClusterRef,
},
}
policy = topologyv1alpha1.Policy{
ObjectMeta: metav1.ObjectMeta{
Name: "policy-deletion-test",
Namespace: namespace,
},
Spec: topologyv1alpha1.PolicySpec{
Name: "polocy-deletion-test",
Pattern: ".*",
ApplyTo: "queues",
Definition: &runtime.RawExtension{
Raw: []byte(`{"ha-mode":"all"}`),
},
RabbitmqClusterReference: targetClusterRef,
},
}
queue = topologyv1alpha1.Queue{
ObjectMeta: metav1.ObjectMeta{
Name: "queue-deletion-test",
Namespace: namespace,
},
Spec: topologyv1alpha1.QueueSpec{
Name: "queue-deletion-test",
RabbitmqClusterReference: targetClusterRef,
},
}
user = topologyv1alpha1.User{
ObjectMeta: metav1.ObjectMeta{
Name: "user-deletion-test",
Namespace: namespace,
},
Spec: topologyv1alpha1.UserSpec{
RabbitmqClusterReference: targetClusterRef,
},
}
vhost = topologyv1alpha1.Vhost{
ObjectMeta: metav1.ObjectMeta{
Name: "vhost-deletion-test",
Namespace: namespace,
},
Spec: topologyv1alpha1.VhostSpec{
Name: "vhost-deletion-test",
RabbitmqClusterReference: targetClusterRef,
},
}
Expect(k8sClient.Create(ctx, &exchange)).To(Succeed())
Expect(k8sClient.Create(ctx, &policy)).To(Succeed())
Expect(k8sClient.Create(ctx, &queue)).To(Succeed())
Expect(k8sClient.Create(ctx, &user)).To(Succeed())
Expect(k8sClient.Create(ctx, &vhost)).To(Succeed())
})

It("handles the referenced RabbitmqCluster being deleted", func() {
Expect(k8sClient.Delete(ctx, &rabbitmqv1beta1.RabbitmqCluster{ObjectMeta: metav1.ObjectMeta{Name: targetCluster.Name, Namespace: targetCluster.Namespace}})).To(Succeed())
Eventually(func() string {
output, _ := kubectl(
"-n",
targetCluster.Namespace,
"get",
"rabbitmqclusters",
targetCluster.Name,
)
return string(output)
}, 90, 10).Should(ContainSubstring("not found"))
By("allowing the topology objects to be deleted")
Expect(k8sClient.Delete(ctx, &exchange)).To(Succeed())
Eventually(func() string {
output, _ := kubectl(
"-n",
exchange.Namespace,
"get",
"exchange",
exchange.Name,
)
return string(output)
}, 30, 10).Should(ContainSubstring("not found"))
Expect(k8sClient.Delete(ctx, &policy)).To(Succeed())
Eventually(func() string {
output, _ := kubectl(
"-n",
policy.Namespace,
"get",
"policy",
policy.Name,
)
return string(output)
}, 30, 10).Should(ContainSubstring("not found"))
Expect(k8sClient.Delete(ctx, &queue)).To(Succeed())
Eventually(func() string {
output, _ := kubectl(
"-n",
queue.Namespace,
"get",
"queue",
queue.Name,
)
return string(output)
}, 30, 10).Should(ContainSubstring("not found"))
Expect(k8sClient.Delete(ctx, &user)).To(Succeed())
Eventually(func() string {
output, _ := kubectl(
"-n",
user.Namespace,
"get",
"user",
user.Name,
)
return string(output)
}, 30, 10).Should(ContainSubstring("not found"))
Expect(k8sClient.Delete(ctx, &vhost)).To(Succeed())
Eventually(func() string {
output, _ := kubectl(
"-n",
vhost.Namespace,
"get",
"vhost",
vhost.Name,
)
return string(output)
}, 30, 10).Should(ContainSubstring("not found"))
})
})
Loading