Skip to content

Commit

Permalink
Support RabbitMQ operator policies
Browse files Browse the repository at this point in the history
Closes #202
  • Loading branch information
ansd committed Jan 26, 2024
1 parent 1bd6d98 commit a1ee0ec
Show file tree
Hide file tree
Showing 35 changed files with 2,191 additions and 98 deletions.
90 changes: 90 additions & 0 deletions api/v1beta1/operatorpolicy_types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package v1beta1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// OperatorPolicySpec defines the desired state of OperatorPolicy
// https://www.rabbitmq.com/parameters.html#operator-policies
type OperatorPolicySpec struct {
// Required property; cannot be updated
// +kubebuilder:validation:Required
Name string `json:"name"`
// Default to vhost '/'; cannot be updated
// +kubebuilder:default:=/
Vhost string `json:"vhost,omitempty"`
// Regular expression pattern used to match queues, e.g. "^my-queue$".
// Required property.
// +kubebuilder:validation:Required
Pattern string `json:"pattern"`
// What this operator policy applies to: 'queues', 'classic_queues', 'quorum_queues', 'streams'.
// Default to 'queues'.
// +kubebuilder:validation:Enum=queues;classic_queues;quorum_queues;streams
// +kubebuilder:default:=queues
ApplyTo string `json:"applyTo,omitempty"`
// Default to '0'.
// In the event that more than one operator policy can match a given queue, the operator policy with the greatest priority applies.
// +kubebuilder:default:=0
Priority int `json:"priority,omitempty"`
// OperatorPolicy definition. Required property.
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Required
Definition *runtime.RawExtension `json:"definition"`
// Reference to the RabbitmqCluster that the operator policy will be created in.
// Required property.
// +kubebuilder:validation:Required
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
}

// OperatorPolicyStatus defines the observed state of OperatorPolicy
type OperatorPolicyStatus struct {
// observedGeneration is the most recent successful generation observed for this OperatorPolicy. It corresponds to the
// OperatorPolicy's generation, which is updated on mutation by the API Server.
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
Conditions []Condition `json:"conditions,omitempty"`
}

// +genclient
// +kubebuilder:object:root=true
// +kubebuilder:resource:categories=all;rabbitmq
// +kubebuilder:subresource:status

// OperatorPolicy is the Schema for the operator policies API
type OperatorPolicy struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Spec OperatorPolicySpec `json:"spec,omitempty"`
Status OperatorPolicyStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

// OperatorPolicyList contains a list of OperatorPolicy
type OperatorPolicyList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []OperatorPolicy `json:"items"`
}

func (p *OperatorPolicy) GroupResource() schema.GroupResource {
return schema.GroupResource{
Group: p.GroupVersionKind().Group,
Resource: p.GroupVersionKind().Kind,
}
}

func (p *OperatorPolicy) RabbitReference() RabbitmqClusterReference {
return p.Spec.RabbitmqClusterReference
}

func (p *OperatorPolicy) SetStatusConditions(c []Condition) {
p.Status.Conditions = c
}

func init() {
SchemeBuilder.Register(&OperatorPolicy{}, &OperatorPolicyList{})
}
117 changes: 117 additions & 0 deletions api/v1beta1/operatorpolicy_types_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package v1beta1

import (
"context"

"k8s.io/apimachinery/pkg/runtime"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var _ = Describe("OperatorPolicy", func() {
var (
namespace = "default"
ctx = context.Background()
)

It("creates an operator policy with minimal configurations", func() {
policy := OperatorPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "test-operator-policy",
Namespace: namespace,
},
Spec: OperatorPolicySpec{
Name: "test-operator-policy",
Pattern: "^some-prefix",
Definition: &runtime.RawExtension{
Raw: []byte(`{"key":"value"}`),
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &policy)).To(Succeed())
fetched := &OperatorPolicy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: policy.Name,
Namespace: policy.Namespace,
}, fetched)).To(Succeed())
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(RabbitmqClusterReference{
Name: "some-cluster",
}))
Expect(fetched.Spec.Name).To(Equal("test-operator-policy"))
Expect(fetched.Spec.Vhost).To(Equal("/"))
Expect(fetched.Spec.Pattern).To(Equal("^some-prefix"))
Expect(fetched.Spec.ApplyTo).To(Equal("queues"))
Expect(fetched.Spec.Priority).To(Equal(0))
Expect(fetched.Spec.Definition.Raw).To(Equal([]byte(`{"key":"value"}`)))
})

It("creates operator policy with configurations", func() {
policy := OperatorPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "random-policy",
Namespace: namespace,
},
Spec: OperatorPolicySpec{
Name: "test-policy",
Vhost: "/hello",
Pattern: "*.",
ApplyTo: "quorum_queues",
Priority: 100,
Definition: &runtime.RawExtension{
Raw: []byte(`{"key":"value"}`),
},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "random-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &policy)).To(Succeed())
fetched := &OperatorPolicy{}
Expect(k8sClient.Get(ctx, types.NamespacedName{
Name: policy.Name,
Namespace: policy.Namespace,
}, fetched)).To(Succeed())

Expect(fetched.Spec.Name).To(Equal("test-policy"))
Expect(fetched.Spec.Vhost).To(Equal("/hello"))
Expect(fetched.Spec.Pattern).To(Equal("*."))
Expect(fetched.Spec.ApplyTo).To(Equal("quorum_queues"))
Expect(fetched.Spec.Priority).To(Equal(100))
Expect(fetched.Spec.RabbitmqClusterReference).To(Equal(
RabbitmqClusterReference{
Name: "random-cluster",
}))
Expect(fetched.Spec.Definition.Raw).To(Equal([]byte(`{"key":"value"}`)))
})

When("creating a policy with an invalid 'ApplyTo' value", func() {
It("fails with validation errors", func() {
policy := OperatorPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "invalid",
Namespace: namespace,
},
Spec: OperatorPolicySpec{
Name: "test-policy",
Pattern: "a-queue-name",
Definition: &runtime.RawExtension{
Raw: []byte(`{"key":"value"}`),
},
ApplyTo: "yo-yo",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
},
}
Expect(k8sClient.Create(ctx, &policy)).To(HaveOccurred())
Expect(k8sClient.Create(ctx, &policy)).To(MatchError(`OperatorPolicy.rabbitmq.com "invalid" is invalid: spec.applyTo: Unsupported value: "yo-yo": supported values: "queues", "classic_queues", "quorum_queues", "streams"`))
})
})

})
57 changes: 57 additions & 0 deletions api/v1beta1/operatorpolicy_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package v1beta1

import (
"fmt"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
)

func (p *OperatorPolicy) SetupWebhookWithManager(mgr ctrl.Manager) error {
return ctrl.NewWebhookManagedBy(mgr).
For(p).
Complete()
}

// +kubebuilder:webhook:verbs=create;update,path=/validate-rabbitmq-com-v1beta1-operatorpolicy,mutating=false,failurePolicy=fail,groups=rabbitmq.com,resources=operatorpolicies,versions=v1beta1,name=voperatorpolicy.kb.io,sideEffects=none,admissionReviewVersions=v1

var _ webhook.Validator = &OperatorPolicy{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
// either rabbitmqClusterReference.name or rabbitmqClusterReference.connectionSecret must be provided but not both
func (p *OperatorPolicy) ValidateCreate() (admission.Warnings, error) {
return p.Spec.RabbitmqClusterReference.ValidateOnCreate(p.GroupResource(), p.Name)
}

// ValidateUpdate returns error type 'forbidden' for updates on operator policy name, vhost and rabbitmqClusterReference
func (p *OperatorPolicy) ValidateUpdate(old runtime.Object) (admission.Warnings, error) {
oldOperatorPolicy, ok := old.(*OperatorPolicy)
if !ok {
return nil, apierrors.NewBadRequest(fmt.Sprintf("expected an operator policy but got a %T", old))
}

detailMsg := "updates on name, vhost and rabbitmqClusterReference are all forbidden"
if p.Spec.Name != oldOperatorPolicy.Spec.Name {
return nil, apierrors.NewForbidden(p.GroupResource(), p.Name,
field.Forbidden(field.NewPath("spec", "name"), detailMsg))
}

if p.Spec.Vhost != oldOperatorPolicy.Spec.Vhost {
return nil, apierrors.NewForbidden(p.GroupResource(), p.Name,
field.Forbidden(field.NewPath("spec", "vhost"), detailMsg))
}

if !oldOperatorPolicy.Spec.RabbitmqClusterReference.Matches(&p.Spec.RabbitmqClusterReference) {
return nil, apierrors.NewForbidden(p.GroupResource(), p.Name,
field.Forbidden(field.NewPath("spec", "rabbitmqClusterReference"), detailMsg))
}
return nil, nil
}

func (p *OperatorPolicy) ValidateDelete() (admission.Warnings, error) {
return nil, nil
}
112 changes: 112 additions & 0 deletions api/v1beta1/operatorpolicy_webhook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package v1beta1

import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

var _ = Describe("policy webhook", func() {
var policy = OperatorPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: OperatorPolicySpec{
Name: "test",
Vhost: "/test",
Pattern: "a-pattern",
ApplyTo: "queues",
Priority: 0,
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "a-cluster",
},
},
}

Context("ValidateCreate", func() {
It("does not allow both spec.rabbitmqClusterReference.name and spec.rabbitmqClusterReference.connectionSecret be configured", func() {
notAllowed := policy.DeepCopy()
notAllowed.Spec.RabbitmqClusterReference.ConnectionSecret = &corev1.LocalObjectReference{Name: "some-secret"}
Expect(apierrors.IsForbidden(ignoreNilWarning(notAllowed.ValidateCreate()))).To(BeTrue())
})

It("spec.rabbitmqClusterReference.name and spec.rabbitmqClusterReference.connectionSecret cannot both be empty", func() {
notAllowed := policy.DeepCopy()
notAllowed.Spec.RabbitmqClusterReference.Name = ""
notAllowed.Spec.RabbitmqClusterReference.ConnectionSecret = nil
Expect(apierrors.IsForbidden(ignoreNilWarning(notAllowed.ValidateCreate()))).To(BeTrue())
})
})

Context("ValidateUpdate", func() {
It("does not allow updates on operator policy name", func() {
newPolicy := policy.DeepCopy()
newPolicy.Spec.Name = "new-name"
Expect(apierrors.IsForbidden(ignoreNilWarning(newPolicy.ValidateUpdate(&policy)))).To(BeTrue())
})

It("does not allow updates on vhost", func() {
newPolicy := policy.DeepCopy()
newPolicy.Spec.Vhost = "new-vhost"
Expect(apierrors.IsForbidden(ignoreNilWarning(newPolicy.ValidateUpdate(&policy)))).To(BeTrue())
})

It("does not allow updates on RabbitmqClusterReference", func() {
newPolicy := policy.DeepCopy()
newPolicy.Spec.RabbitmqClusterReference = RabbitmqClusterReference{
Name: "new-cluster",
}
Expect(apierrors.IsForbidden(ignoreNilWarning(newPolicy.ValidateUpdate(&policy)))).To(BeTrue())
})

It("does not allow updates on rabbitmqClusterReference.connectionSecret", func() {
connectionScr := OperatorPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: OperatorPolicySpec{
Name: "test",
Vhost: "/test",
Pattern: "a-pattern",
ApplyTo: "all",
Priority: 0,
RabbitmqClusterReference: RabbitmqClusterReference{
ConnectionSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
},
},
}
new := connectionScr.DeepCopy()
new.Spec.RabbitmqClusterReference.ConnectionSecret.Name = "new-secret"
Expect(apierrors.IsForbidden(ignoreNilWarning(new.ValidateUpdate(&connectionScr)))).To(BeTrue())
})

It("allows updates on operator policy.spec.pattern", func() {
newPolicy := policy.DeepCopy()
newPolicy.Spec.Pattern = "new-pattern"
Expect(ignoreNilWarning(newPolicy.ValidateUpdate(&policy))).To(Succeed())
})

It("allows updates on operator policy.spec.applyTo", func() {
newPolicy := policy.DeepCopy()
newPolicy.Spec.ApplyTo = "queues"
Expect(ignoreNilWarning(newPolicy.ValidateUpdate(&policy))).To(Succeed())
})

It("allows updates on operator policy.spec.priority", func() {
newPolicy := policy.DeepCopy()
newPolicy.Spec.Priority = 1000
Expect(ignoreNilWarning(newPolicy.ValidateUpdate(&policy))).To(Succeed())
})

It("allows updates on operator policy.spec.definition", func() {
newPolicy := policy.DeepCopy()
newPolicy.Spec.Definition = &runtime.RawExtension{Raw: []byte(`{"key":"new-definition-value"}`)}
Expect(ignoreNilWarning(newPolicy.ValidateUpdate(&policy))).To(Succeed())
})
})
})
2 changes: 1 addition & 1 deletion api/v1beta1/policy_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type PolicySpec struct {
// +kubebuilder:pruning:PreserveUnknownFields
// +kubebuilder:validation:Required
Definition *runtime.RawExtension `json:"definition"`
// Reference to the RabbitmqCluster that the exchange will be created in.
// Reference to the RabbitmqCluster that the policy will be created in.
// Required property.
// +kubebuilder:validation:Required
RabbitmqClusterReference RabbitmqClusterReference `json:"rabbitmqClusterReference"`
Expand Down
Loading

0 comments on commit a1ee0ec

Please sign in to comment.