diff --git a/api/v1alpha1/binding_webhook.go b/api/v1alpha1/binding_webhook.go index e9024d34..3322e0b6 100644 --- a/api/v1alpha1/binding_webhook.go +++ b/api/v1alpha1/binding_webhook.go @@ -5,6 +5,7 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/validation/field" + "reflect" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/webhook" ) @@ -31,13 +32,64 @@ func (b *Binding) ValidateUpdate(old runtime.Object) error { return apierrors.NewBadRequest(fmt.Sprintf("expected a binding but got a %T", old)) } - if oldBinding.Spec != b.Spec { - return apierrors.NewForbidden( - b.GroupResource(), - b.Name, - field.Forbidden(field.NewPath("spec"), "binding.spec is immutable")) + var allErrs field.ErrorList + detailMsg := "updates on vhost and rabbitmqClusterReference are all forbidden" + + if b.Spec.Vhost != oldBinding.Spec.Vhost { + return apierrors.NewForbidden(b.GroupResource(), b.Name, + field.Forbidden(field.NewPath("spec", "vhost"), detailMsg)) } - return nil + + if b.Spec.RabbitmqClusterReference != oldBinding.Spec.RabbitmqClusterReference { + return apierrors.NewForbidden(b.GroupResource(), b.Name, + field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), detailMsg)) + } + + if b.Spec.Source != oldBinding.Spec.Source { + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec", "source"), + b.Spec.Source, + "source cannot be updated", + )) + } + + if b.Spec.Destination != oldBinding.Spec.Destination { + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec", "destination"), + b.Spec.Destination, + "destination cannot be updated", + )) + } + + if b.Spec.DestinationType != oldBinding.Spec.DestinationType { + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec", "destinationType"), + b.Spec.DestinationType, + "destinationType cannot be updated", + )) + } + + if b.Spec.RoutingKey != oldBinding.Spec.RoutingKey { + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec", "routingKey"), + b.Spec.RoutingKey, + "routingKey cannot be updated", + )) + } + + if !reflect.DeepEqual(b.Spec.Arguments, oldBinding.Spec.Arguments) { + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec", "arguments"), + b.Spec.Arguments, + "arguments cannot be updated", + )) + } + + if len(allErrs) == 0 { + return nil + } + + return apierrors.NewInvalid(GroupVersion.WithKind("Binding").GroupKind(), b.Name, allErrs) } // no validation logic on delete diff --git a/api/v1alpha1/binding_webhook_test.go b/api/v1alpha1/binding_webhook_test.go index e4b85f89..c450248d 100644 --- a/api/v1alpha1/binding_webhook_test.go +++ b/api/v1alpha1/binding_webhook_test.go @@ -32,42 +32,42 @@ var _ = Describe("Binding webhook", func() { Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) }) + It("does not allow updates on RabbitmqClusterReference", func() { + newBinding := oldBinding.DeepCopy() + newBinding.Spec.RabbitmqClusterReference = RabbitmqClusterReference{ + Name: "new-cluster", + Namespace: "default", + } + Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) + }) + It("does not allow updates on source", func() { newBinding := oldBinding.DeepCopy() newBinding.Spec.Source = "updated-source" - Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) + Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) }) It("does not allow updates on destination", func() { newBinding := oldBinding.DeepCopy() newBinding.Spec.Destination = "updated-des" - Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) + Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) }) It("does not allow updates on destination type", func() { newBinding := oldBinding.DeepCopy() newBinding.Spec.DestinationType = "exchange" - Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) + Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) }) It("does not allow updates on routing key", func() { newBinding := oldBinding.DeepCopy() newBinding.Spec.RoutingKey = "not-allowed" - Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) + Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) }) It("does not allow updates on binding arguments", func() { newBinding := oldBinding.DeepCopy() newBinding.Spec.Arguments = &runtime.RawExtension{Raw: []byte(`{"new":"new-value"}`)} - Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) - }) - - It("does not allow updates on RabbitmqClusterReference", func() { - newBinding := oldBinding.DeepCopy() - newBinding.Spec.RabbitmqClusterReference = RabbitmqClusterReference{ - Name: "new-cluster", - Namespace: "default", - } - Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) + Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue()) }) }) diff --git a/controllers/binding_controller.go b/controllers/binding_controller.go index 87516be5..d21ce865 100644 --- a/controllers/binding_controller.go +++ b/controllers/binding_controller.go @@ -12,12 +12,16 @@ package controllers import ( "context" "encoding/json" + "errors" rabbithole "github.com/michaelklishin/rabbit-hole/v2" "github.com/rabbitmq/messaging-topology-operator/internal" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" clientretry "k8s.io/client-go/util/retry" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/reconcile" + "time" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" @@ -27,6 +31,8 @@ import ( topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1" ) +const bindingFinalizer = "deletion.finalizers.bindings.rabbitmq.com" + // BindingReconciler reconciles a Binding object type BindingReconciler struct { client.Client @@ -47,11 +53,23 @@ func (r *BindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } rabbitClient, err := rabbitholeClient(ctx, r.Client, binding.Spec.RabbitmqClusterReference) - if err != nil { + + if errors.Is(err, NoSuchRabbitmqClusterError) && binding.ObjectMeta.DeletionTimestamp.IsZero() { + logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error()) + return reconcile.Result{RequeueAfter: 10 * time.Second}, err + } else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) { logger.Error(err, failedGenerateRabbitClient) return reconcile.Result{}, err } + if !binding.ObjectMeta.DeletionTimestamp.IsZero() { + logger.Info("Deleting") + return ctrl.Result{}, r.deleteBinding(ctx, rabbitClient, binding) + } + + if err := r.addFinalizerIfNeeded(ctx, binding); err != nil { + return ctrl.Result{}, err + } spec, err := json.Marshal(binding.Spec) if err != nil { logger.Error(err, failedMarshalSpec) @@ -106,6 +124,100 @@ func (r *BindingReconciler) declareBinding(ctx context.Context, client *rabbitho return nil } +// deletes binding from rabbitmq server; bindings have no name; server needs BindingInfo to delete them +// when server responds with '404' Not Found, it logs and does not requeue on error +// if no binding argument is set, generating properties key by using internal.GeneratePropertiesKey +// if binding arguments are set, list all bindings between source/destination to find the binding; if it failed to find corresponding binding, it assumes that the binding is already deleted and returns no error +func (r *BindingReconciler) deleteBinding(ctx context.Context, client *rabbithole.Client, binding *topologyv1alpha1.Binding) error { + logger := ctrl.LoggerFrom(ctx) + + var info *rabbithole.BindingInfo + var err error + if binding.Spec.Arguments != nil { + info, err = r.findBindingInfo(logger, binding, client) + if err != nil { + return err + } + if info == nil { + logger.Info("cannot find the corresponding binding info in rabbitmq server; binding already deleted") + return r.removeFinalizer(ctx, binding) + } + } else { + info, err = internal.GenerateBindingInfo(binding) + if err != nil { + msg := "failed to generate binding info" + r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg) + logger.Error(err, msg) + return err + } + info.PropertiesKey = internal.GeneratePropertiesKey(binding) + } + + err = validateResponseForDeletion(client.DeleteBinding(binding.Spec.Vhost, *info)) + if errors.Is(err, NotFound) { + logger.Info("cannot find binding in rabbitmq server; already deleted") + } else if err != nil { + msg := "failed to delete binding" + r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg) + logger.Error(err, msg) + return err + } + + logger.Info("Successfully deleted binding") + return r.removeFinalizer(ctx, binding) +} + +func (r *BindingReconciler) findBindingInfo(logger logr.Logger, binding *topologyv1alpha1.Binding, client *rabbithole.Client) (*rabbithole.BindingInfo, error) { + logger.Info("binding arguments set; listing bindings from server to complete deletion") + arguments := make(map[string]interface{}) + if binding.Spec.Arguments != nil { + if err := json.Unmarshal(binding.Spec.Arguments.Raw, &arguments); err != nil { + msg := "failed to unmarshall binding arguments" + r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg) + logger.Error(err, msg) + return nil, err + } + } + var bindingInfos []rabbithole.BindingInfo + var err error + if binding.Spec.DestinationType == "queue" { + bindingInfos, err = client.ListQueueBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination) + } else { + bindingInfos, err = client.ListExchangeBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination) + } + if err != nil { + msg := "failed to list binding infos" + r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg) + logger.Error(err, msg) + return nil, err + } + var info *rabbithole.BindingInfo + for _, b := range bindingInfos { + if binding.Spec.RoutingKey == b.RoutingKey && reflect.DeepEqual(b.Arguments, arguments) { + info = &b + } + } + return info, nil +} + +func (r *BindingReconciler) removeFinalizer(ctx context.Context, binding *topologyv1alpha1.Binding) error { + controllerutil.RemoveFinalizer(binding, bindingFinalizer) + if err := r.Client.Update(ctx, binding); err != nil { + return err + } + return nil +} + +func (r *BindingReconciler) addFinalizerIfNeeded(ctx context.Context, binding *topologyv1alpha1.Binding) error { + if binding.ObjectMeta.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(binding, bindingFinalizer) { + controllerutil.AddFinalizer(binding, bindingFinalizer) + if err := r.Client.Update(ctx, binding); err != nil { + return err + } + } + return nil +} + func (r *BindingReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&topologyv1alpha1.Binding{}). diff --git a/internal/binding_info.go b/internal/binding.go similarity index 60% rename from internal/binding_info.go rename to internal/binding.go index 3f395882..862b8049 100644 --- a/internal/binding_info.go +++ b/internal/binding.go @@ -14,6 +14,7 @@ import ( "fmt" rabbithole "github.com/michaelklishin/rabbit-hole/v2" topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1" + "strings" ) func GenerateBindingInfo(binding *topologyv1alpha1.Binding) (*rabbithole.BindingInfo, error) { @@ -33,3 +34,23 @@ func GenerateBindingInfo(binding *topologyv1alpha1.Binding) (*rabbithole.Binding Arguments: arguments, }, nil } + +// Generate binding properties key which is necessary when deleting a binding +// Binding properties key is: +// when routing key and argument are not provided, properties key is "~" +// when routing key is set and no argument is provided, properties key is the routing key itself +// if routing key has character '~', it's replaced by '%7E' +// when arguments are provided, properties key is the routing key (could be empty) plus the hash of arguments +// the hash function used is 'erlang:phash2' and it's erlang specific; GeneratePropertiesKey returns empty +// string if arguments are provided (deletion not supported) + +func GeneratePropertiesKey(binding *topologyv1alpha1.Binding) string { + if binding.Spec.RoutingKey == "" { + return "~" + } + if binding.Spec.Arguments == nil { + return strings.ReplaceAll(binding.Spec.RoutingKey, "~", "%7E") + } + + return "" +} diff --git a/internal/binding_info_test.go b/internal/binding_info_test.go deleted file mode 100644 index 7bfbeb3d..00000000 --- a/internal/binding_info_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package internal_test - -import ( - . "github.com/onsi/ginkgo" - . "github.com/onsi/gomega" - topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1" - "github.com/rabbitmq/messaging-topology-operator/internal" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" -) - -var _ = Describe("GenerateBindingInfo()", func() { - var binding *topologyv1alpha1.Binding - - BeforeEach(func() { - binding = &topologyv1alpha1.Binding{ - ObjectMeta: metav1.ObjectMeta{ - Name: "exchange", - }, - Spec: topologyv1alpha1.BindingSpec{ - Vhost: "/avhost", - Source: "test-exchange", - Destination: "test-queue", - DestinationType: "queue", - RoutingKey: "a-key", - }, - } - }) - - It("sets the correct vhost", func() { - info, err := internal.GenerateBindingInfo(binding) - Expect(err).NotTo(HaveOccurred()) - Expect(info.Vhost).To(Equal("/avhost")) - }) - - It("sets the correct source", func() { - info, err := internal.GenerateBindingInfo(binding) - Expect(err).NotTo(HaveOccurred()) - Expect(info.Source).To(Equal("test-exchange")) - }) - - It("sets the correct destination", func() { - info, err := internal.GenerateBindingInfo(binding) - Expect(err).NotTo(HaveOccurred()) - Expect(info.Destination).To(Equal("test-queue")) - }) - - It("sets the correct destination type", func() { - info, err := internal.GenerateBindingInfo(binding) - Expect(err).NotTo(HaveOccurred()) - Expect(info.DestinationType).To(Equal("queue")) - }) - - It("sets the correct routing key", func() { - info, err := internal.GenerateBindingInfo(binding) - Expect(err).NotTo(HaveOccurred()) - Expect(info.RoutingKey).To(Equal("a-key")) - }) - - When("exchange arguments are provided", func() { - It("generates the correct exchange arguments", func() { - binding.Spec.Arguments = &runtime.RawExtension{ - Raw: []byte(`{"argument": "argument-value"}`), - } - info, err := internal.GenerateBindingInfo(binding) - Expect(err).NotTo(HaveOccurred()) - Expect(info.Arguments).To(HaveLen(1)) - Expect(info.Arguments).To(HaveKeyWithValue("argument", "argument-value")) - }) - }) - -}) diff --git a/internal/binding_test.go b/internal/binding_test.go new file mode 100644 index 00000000..8665e41a --- /dev/null +++ b/internal/binding_test.go @@ -0,0 +1,109 @@ +package internal_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1" + "github.com/rabbitmq/messaging-topology-operator/internal" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +var _ = Describe("Binding", func() { + var binding *topologyv1alpha1.Binding + Context("GenerateBindingInfo", func() { + BeforeEach(func() { + binding = &topologyv1alpha1.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "exchange", + }, + Spec: topologyv1alpha1.BindingSpec{ + Vhost: "/avhost", + Source: "test-exchange", + Destination: "test-queue", + DestinationType: "queue", + RoutingKey: "a-key", + }, + } + }) + + It("sets the correct vhost", func() { + info, err := internal.GenerateBindingInfo(binding) + Expect(err).NotTo(HaveOccurred()) + Expect(info.Vhost).To(Equal("/avhost")) + }) + + It("sets the correct source", func() { + info, err := internal.GenerateBindingInfo(binding) + Expect(err).NotTo(HaveOccurred()) + Expect(info.Source).To(Equal("test-exchange")) + }) + + It("sets the correct destination", func() { + info, err := internal.GenerateBindingInfo(binding) + Expect(err).NotTo(HaveOccurred()) + Expect(info.Destination).To(Equal("test-queue")) + }) + + It("sets the correct destination type", func() { + info, err := internal.GenerateBindingInfo(binding) + Expect(err).NotTo(HaveOccurred()) + Expect(info.DestinationType).To(Equal("queue")) + }) + + It("sets the correct routing key", func() { + info, err := internal.GenerateBindingInfo(binding) + Expect(err).NotTo(HaveOccurred()) + Expect(info.RoutingKey).To(Equal("a-key")) + }) + + When("exchange arguments are provided", func() { + It("generates the correct exchange arguments", func() { + binding.Spec.Arguments = &runtime.RawExtension{ + Raw: []byte(`{"argument": "argument-value"}`), + } + info, err := internal.GenerateBindingInfo(binding) + Expect(err).NotTo(HaveOccurred()) + Expect(info.Arguments).To(HaveLen(1)) + Expect(info.Arguments).To(HaveKeyWithValue("argument", "argument-value")) + }) + }) + }) + + Context("GeneratePropertiesKey", func() { + BeforeEach(func() { + binding = &topologyv1alpha1.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Name: "exchange", + }, + Spec: topologyv1alpha1.BindingSpec{ + Vhost: "/avhost", + Source: "test-exchange", + Destination: "test-queue", + DestinationType: "queue", + }, + } + }) + + When("routing key is not set", func() { + It("returns the default properties key value", func() { + propertiesKey := internal.GeneratePropertiesKey(binding) + Expect(propertiesKey).To(Equal("~")) + }) + }) + + When("routing key is set", func() { + It("returns the routing key as properties key", func() { + binding.Spec.RoutingKey = "a-great-routing-key" + propertiesKey := internal.GeneratePropertiesKey(binding) + Expect(propertiesKey).To(Equal("a-great-routing-key")) + }) + + It("replaces character '~' if it's in the routing key", func() { + binding.Spec.RoutingKey = "special~routing~key" + propertiesKey := internal.GeneratePropertiesKey(binding) + Expect(propertiesKey).To(Equal("special%7Erouting%7Ekey")) + }) + }) + }) +}) diff --git a/system_tests/binding_system_tests.go b/system_tests/binding_system_test.go similarity index 91% rename from system_tests/binding_system_tests.go rename to system_tests/binding_system_test.go index 9c203f8f..7dd039a2 100644 --- a/system_tests/binding_system_tests.go +++ b/system_tests/binding_system_test.go @@ -82,7 +82,6 @@ var _ = Describe("Binding", func() { }) AfterEach(func() { - Expect(k8sClient.Delete(ctx, binding)).To(Succeed()) Expect(k8sClient.Delete(ctx, queue)).To(Succeed()) Expect(k8sClient.Delete(ctx, exchange)).To(Succeed()) }) @@ -129,6 +128,15 @@ var _ = Describe("Binding", func() { updateBinding := topologyv1alpha1.Binding{} Expect(k8sClient.Get(ctx, types.NamespacedName{Name: binding.Name, Namespace: binding.Namespace}, &updateBinding)).To(Succeed()) updatedBinding.Spec.RoutingKey = "new-key" - Expect(k8sClient.Update(ctx, &updatedBinding).Error()).To(ContainSubstring("spec: Forbidden: binding.spec is immutable")) + Expect(k8sClient.Update(ctx, &updatedBinding).Error()).To(ContainSubstring("invalid: spec.routingKey: Invalid value: \"new-key\": routingKey cannot be updated")) + + By("deleting binding from rabbitmq server") + Expect(k8sClient.Delete(ctx, binding)).To(Succeed()) + Eventually(func() int { + var err error + bindings, err := rabbitClient.ListQueueBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination) + Expect(err).NotTo(HaveOccurred()) + return len(bindings) + }, 10, 2).Should(Equal(0), "cannot find created binding") }) }) diff --git a/system_tests/deletion_system_tests.go b/system_tests/deletion_system_test.go similarity index 100% rename from system_tests/deletion_system_tests.go rename to system_tests/deletion_system_test.go diff --git a/system_tests/exchange_system_tests.go b/system_tests/exchange_system_test.go similarity index 100% rename from system_tests/exchange_system_tests.go rename to system_tests/exchange_system_test.go diff --git a/system_tests/policy_system_tests.go b/system_tests/policy_system_test.go similarity index 100% rename from system_tests/policy_system_tests.go rename to system_tests/policy_system_test.go diff --git a/system_tests/queue_system_tests.go b/system_tests/queue_system_test.go similarity index 100% rename from system_tests/queue_system_tests.go rename to system_tests/queue_system_test.go diff --git a/system_tests/user_system_tests.go b/system_tests/user_system_test.go similarity index 100% rename from system_tests/user_system_tests.go rename to system_tests/user_system_test.go diff --git a/system_tests/vhost_system_tests.go b/system_tests/vhost_system_test.go similarity index 100% rename from system_tests/vhost_system_tests.go rename to system_tests/vhost_system_test.go