Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support bindings.rabbitmq.com deletion #82

Merged
merged 5 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions api/v1alpha1/binding_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
"reflect"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
)
Expand All @@ -31,13 +32,64 @@ func (b *Binding) ValidateUpdate(old runtime.Object) error {
return apierrors.NewBadRequest(fmt.Sprintf("expected a binding but got a %T", old))
}

if oldBinding.Spec != b.Spec {
return apierrors.NewForbidden(
b.GroupResource(),
b.Name,
field.Forbidden(field.NewPath("spec"), "binding.spec is immutable"))
var allErrs field.ErrorList
detailMsg := "updates on vhost and rabbitmqClusterReference are all forbidden"

if b.Spec.Vhost != oldBinding.Spec.Vhost {
return apierrors.NewForbidden(b.GroupResource(), b.Name,
field.Forbidden(field.NewPath("spec", "vhost"), detailMsg))
}
return nil

if b.Spec.RabbitmqClusterReference != oldBinding.Spec.RabbitmqClusterReference {
return apierrors.NewForbidden(b.GroupResource(), b.Name,
field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), detailMsg))
}

if b.Spec.Source != oldBinding.Spec.Source {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "source"),
b.Spec.Source,
"source cannot be updated",
))
}

if b.Spec.Destination != oldBinding.Spec.Destination {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "destination"),
b.Spec.Destination,
"destination cannot be updated",
))
}

if b.Spec.DestinationType != oldBinding.Spec.DestinationType {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "destinationType"),
b.Spec.DestinationType,
"destinationType cannot be updated",
))
}

if b.Spec.RoutingKey != oldBinding.Spec.RoutingKey {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "routingKey"),
b.Spec.RoutingKey,
"routingKey cannot be updated",
))
}

if !reflect.DeepEqual(b.Spec.Arguments, oldBinding.Spec.Arguments) {
allErrs = append(allErrs, field.Invalid(
field.NewPath("spec", "arguments"),
b.Spec.Arguments,
"arguments cannot be updated",
))
}

if len(allErrs) == 0 {
return nil
}

return apierrors.NewInvalid(GroupVersion.WithKind("Binding").GroupKind(), b.Name, allErrs)
}

// no validation logic on delete
Expand Down
28 changes: 14 additions & 14 deletions api/v1alpha1/binding_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,42 +32,42 @@ var _ = Describe("Binding webhook", func() {
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on RabbitmqClusterReference", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.RabbitmqClusterReference = RabbitmqClusterReference{
Name: "new-cluster",
Namespace: "default",
}
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on source", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.Source = "updated-source"
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on destination", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.Destination = "updated-des"
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on destination type", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.DestinationType = "exchange"
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on routing key", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.RoutingKey = "not-allowed"
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on binding arguments", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.Arguments = &runtime.RawExtension{Raw: []byte(`{"new":"new-value"}`)}
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})

It("does not allow updates on RabbitmqClusterReference", func() {
newBinding := oldBinding.DeepCopy()
newBinding.Spec.RabbitmqClusterReference = RabbitmqClusterReference{
Name: "new-cluster",
Namespace: "default",
}
Expect(apierrors.IsForbidden(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
Expect(apierrors.IsInvalid(newBinding.ValidateUpdate(&oldBinding))).To(BeTrue())
})
})
114 changes: 113 additions & 1 deletion controllers/binding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ package controllers
import (
"context"
"encoding/json"
"errors"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
"github.com/rabbitmq/messaging-topology-operator/internal"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
clientretry "k8s.io/client-go/util/retry"
"reflect"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -27,6 +31,8 @@ import (
topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
)

const bindingFinalizer = "deletion.finalizers.bindings.rabbitmq.com"

// BindingReconciler reconciles a Binding object
type BindingReconciler struct {
client.Client
Expand All @@ -47,11 +53,23 @@ func (r *BindingReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

rabbitClient, err := rabbitholeClient(ctx, r.Client, binding.Spec.RabbitmqClusterReference)
if err != nil {

if errors.Is(err, NoSuchRabbitmqClusterError) && binding.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Could not generate rabbitClient for non existent cluster: " + err.Error())
return reconcile.Result{RequeueAfter: 10 * time.Second}, err
} else if err != nil && !errors.Is(err, NoSuchRabbitmqClusterError) {
logger.Error(err, failedGenerateRabbitClient)
return reconcile.Result{}, err
}

if !binding.ObjectMeta.DeletionTimestamp.IsZero() {
logger.Info("Deleting")
return ctrl.Result{}, r.deleteBinding(ctx, rabbitClient, binding)
}

if err := r.addFinalizerIfNeeded(ctx, binding); err != nil {
return ctrl.Result{}, err
}
spec, err := json.Marshal(binding.Spec)
if err != nil {
logger.Error(err, failedMarshalSpec)
Expand Down Expand Up @@ -106,6 +124,100 @@ func (r *BindingReconciler) declareBinding(ctx context.Context, client *rabbitho
return nil
}

// deletes binding from rabbitmq server; bindings have no name; server needs BindingInfo to delete them
// when server responds with '404' Not Found, it logs and does not requeue on error
// if no binding argument is set, generating properties key by using internal.GeneratePropertiesKey
// if binding arguments are set, list all bindings between source/destination to find the binding; if it failed to find corresponding binding, it assumes that the binding is already deleted and returns no error
func (r *BindingReconciler) deleteBinding(ctx context.Context, client *rabbithole.Client, binding *topologyv1alpha1.Binding) error {
logger := ctrl.LoggerFrom(ctx)

var info *rabbithole.BindingInfo
var err error
if binding.Spec.Arguments != nil {
info, err = r.findBindingInfo(logger, binding, client)
if err != nil {
return err
}
if info == nil {
logger.Info("cannot find the corresponding binding info in rabbitmq server; binding already deleted")
return r.removeFinalizer(ctx, binding)
}
} else {
info, err = internal.GenerateBindingInfo(binding)
if err != nil {
msg := "failed to generate binding info"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return err
}
info.PropertiesKey = internal.GeneratePropertiesKey(binding)
}

err = validateResponseForDeletion(client.DeleteBinding(binding.Spec.Vhost, *info))
if errors.Is(err, NotFound) {
logger.Info("cannot find binding in rabbitmq server; already deleted")
} else if err != nil {
msg := "failed to delete binding"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return err
}

logger.Info("Successfully deleted binding")
return r.removeFinalizer(ctx, binding)
}

func (r *BindingReconciler) findBindingInfo(logger logr.Logger, binding *topologyv1alpha1.Binding, client *rabbithole.Client) (*rabbithole.BindingInfo, error) {
logger.Info("binding arguments set; listing bindings from server to complete deletion")
arguments := make(map[string]interface{})
if binding.Spec.Arguments != nil {
if err := json.Unmarshal(binding.Spec.Arguments.Raw, &arguments); err != nil {
msg := "failed to unmarshall binding arguments"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return nil, err
}
}
var bindingInfos []rabbithole.BindingInfo
var err error
if binding.Spec.DestinationType == "queue" {
bindingInfos, err = client.ListQueueBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination)
} else {
bindingInfos, err = client.ListExchangeBindingsBetween(binding.Spec.Vhost, binding.Spec.Source, binding.Spec.Destination)
}
if err != nil {
msg := "failed to list binding infos"
r.Recorder.Event(binding, corev1.EventTypeWarning, "FailedDelete", msg)
logger.Error(err, msg)
return nil, err
}
var info *rabbithole.BindingInfo
for _, b := range bindingInfos {
if binding.Spec.RoutingKey == b.RoutingKey && reflect.DeepEqual(b.Arguments, arguments) {
info = &b
}
}
return info, nil
}

func (r *BindingReconciler) removeFinalizer(ctx context.Context, binding *topologyv1alpha1.Binding) error {
controllerutil.RemoveFinalizer(binding, bindingFinalizer)
if err := r.Client.Update(ctx, binding); err != nil {
return err
}
return nil
}

func (r *BindingReconciler) addFinalizerIfNeeded(ctx context.Context, binding *topologyv1alpha1.Binding) error {
if binding.ObjectMeta.DeletionTimestamp.IsZero() && !controllerutil.ContainsFinalizer(binding, bindingFinalizer) {
controllerutil.AddFinalizer(binding, bindingFinalizer)
if err := r.Client.Update(ctx, binding); err != nil {
return err
}
}
return nil
}

func (r *BindingReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&topologyv1alpha1.Binding{}).
Expand Down
21 changes: 21 additions & 0 deletions internal/binding_info.go → internal/binding.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"fmt"
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
topologyv1alpha1 "github.com/rabbitmq/messaging-topology-operator/api/v1alpha1"
"strings"
)

func GenerateBindingInfo(binding *topologyv1alpha1.Binding) (*rabbithole.BindingInfo, error) {
Expand All @@ -33,3 +34,23 @@ func GenerateBindingInfo(binding *topologyv1alpha1.Binding) (*rabbithole.Binding
Arguments: arguments,
}, nil
}

// Generate binding properties key which is necessary when deleting a binding
// Binding properties key is:
// when routing key and argument are not provided, properties key is "~"
// when routing key is set and no argument is provided, properties key is the routing key itself
// if routing key has character '~', it's replaced by '%7E'
// when arguments are provided, properties key is the routing key (could be empty) plus the hash of arguments
// the hash function used is 'erlang:phash2' and it's erlang specific; GeneratePropertiesKey returns empty
// string if arguments are provided (deletion not supported)

func GeneratePropertiesKey(binding *topologyv1alpha1.Binding) string {
if binding.Spec.RoutingKey == "" {
return "~"
}
if binding.Spec.Arguments == nil {
return strings.ReplaceAll(binding.Spec.RoutingKey, "~", "%7E")
}

return ""
}
72 changes: 0 additions & 72 deletions internal/binding_info_test.go

This file was deleted.

Loading