Skip to content

Commit

Permalink
Merge pull request #620 from rabbitmq/shovel
Browse files Browse the repository at this point in the history
Correct shovel properties type and expose more shovel configurations in CRD
  • Loading branch information
ChunyiLyu authored May 24, 2023
2 parents 6ffbec5 + 4907873 commit 6488550
Show file tree
Hide file tree
Showing 26 changed files with 485 additions and 169 deletions.
59 changes: 37 additions & 22 deletions api/v1beta1/shovel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package v1beta1
import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)

Expand All @@ -26,28 +27,42 @@ type ShovelSpec struct {
// +kubebuilder:validation:Required
UriSecret *corev1.LocalObjectReference `json:"uriSecret"`
// +kubebuilder:validation:Enum=on-confirm;on-publish;no-ack
AckMode string `json:"ackMode,omitempty"`
AddForwardHeaders bool `json:"addForwardHeaders,omitempty"`
DeleteAfter string `json:"deleteAfter,omitempty"`
DestinationAddForwardHeaders bool `json:"destAddForwardHeaders,omitempty"`
DestinationAddTimestampHeader bool `json:"destAddTimestampHeader,omitempty"`
DestinationAddress string `json:"destAddress,omitempty"`
DestinationApplicationProperties string `json:"destApplicationProperties,omitempty"`
DestinationExchange string `json:"destExchange,omitempty"`
DestinationExchangeKey string `json:"destExchangeKey,omitempty"`
DestinationProperties string `json:"destProperties,omitempty"`
DestinationProtocol string `json:"destProtocol,omitempty"`
DestinationPublishProperties string `json:"destPublishProperties,omitempty"`
DestinationQueue string `json:"destQueue,omitempty"`
PrefetchCount int `json:"prefetchCount,omitempty"`
ReconnectDelay int `json:"reconnectDelay,omitempty"`
SourceAddress string `json:"srcAddress,omitempty"`
SourceDeleteAfter string `json:"srcDeleteAfter,omitempty"`
SourceExchange string `json:"srcExchange,omitempty"`
SourceExchangeKey string `json:"srcExchangeKey,omitempty"`
SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"`
SourceProtocol string `json:"srcProtocol,omitempty"`
SourceQueue string `json:"srcQueue,omitempty"`
AckMode string `json:"ackMode,omitempty"`
AddForwardHeaders bool `json:"addForwardHeaders,omitempty"`
DeleteAfter string `json:"deleteAfter,omitempty"`
DestinationAddForwardHeaders bool `json:"destAddForwardHeaders,omitempty"`
DestinationAddTimestampHeader bool `json:"destAddTimestampHeader,omitempty"`
DestinationAddress string `json:"destAddress,omitempty"`
DestinationExchange string `json:"destExchange,omitempty"`
DestinationExchangeKey string `json:"destExchangeKey,omitempty"`
// +kubebuilder:validation:Enum=amqp091;amqp10
DestinationProtocol string `json:"destProtocol,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
DestinationApplicationProperties *runtime.RawExtension `json:"destApplicationProperties,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
DestinationProperties *runtime.RawExtension `json:"destProperties,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
DestinationPublishProperties *runtime.RawExtension `json:"destPublishProperties,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
DestinationMessageAnnotations *runtime.RawExtension `json:"destMessageAnnotations,omitempty"`
DestinationQueue string `json:"destQueue,omitempty"`
PrefetchCount int `json:"prefetchCount,omitempty"`
ReconnectDelay int `json:"reconnectDelay,omitempty"`
SourceAddress string `json:"srcAddress,omitempty"`
SourceDeleteAfter string `json:"srcDeleteAfter,omitempty"`
SourceExchange string `json:"srcExchange,omitempty"`
SourceExchangeKey string `json:"srcExchangeKey,omitempty"`
SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"`
// +kubebuilder:validation:Enum=amqp091;amqp10
SourceProtocol string `json:"srcProtocol,omitempty"`
SourceQueue string `json:"srcQueue,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
SourceConsumerArgs *runtime.RawExtension `json:"srcConsumerArgs,omitempty"`
}

// ShovelStatus defines the observed state of Shovel
Expand Down
59 changes: 54 additions & 5 deletions api/v1beta1/shovel_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
)

Expand Down Expand Up @@ -63,12 +64,13 @@ var _ = Describe("Shovel spec", func() {
DestinationAddForwardHeaders: true,
DestinationAddTimestampHeader: true,
DestinationAddress: "myQueue",
DestinationApplicationProperties: "a-property",
DestinationApplicationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
DestinationMessageAnnotations: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
DestinationExchange: "an-exchange",
DestinationExchangeKey: "a-key",
DestinationProperties: "a-property",
DestinationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
DestinationProtocol: "amqp091",
DestinationPublishProperties: "a-property",
DestinationPublishProperties: &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 1}`)},
DestinationQueue: "a-queue",
PrefetchCount: 10,
ReconnectDelay: 10,
Expand All @@ -79,6 +81,7 @@ var _ = Describe("Shovel spec", func() {
SourcePrefetchCount: 10,
SourceProtocol: "amqp091",
SourceQueue: "a-queue",
SourceConsumerArgs: &runtime.RawExtension{Raw: []byte(`{"arg": "arg-value"}`)},
}}
Expect(k8sClient.Create(ctx, &shovel)).To(Succeed())
fetched := &Shovel{}
Expand All @@ -98,10 +101,11 @@ var _ = Describe("Shovel spec", func() {
Expect(fetched.Spec.DestinationAddTimestampHeader).To(BeTrue())
Expect(fetched.Spec.DestinationAddForwardHeaders).To(BeTrue())
Expect(fetched.Spec.DestinationAddress).To(Equal("myQueue"))
Expect(fetched.Spec.DestinationApplicationProperties).To(Equal("a-property"))
Expect(fetched.Spec.DestinationApplicationProperties.Raw).To(Equal([]byte(`{"key":"a-property"}`)))
Expect(fetched.Spec.DestinationExchange).To(Equal("an-exchange"))
Expect(fetched.Spec.DestinationExchangeKey).To(Equal("a-key"))
Expect(fetched.Spec.DestinationProperties).To(Equal("a-property"))
Expect(fetched.Spec.DestinationProperties.Raw).To(Equal([]byte(`{"key":"a-property"}`)))
Expect(fetched.Spec.DestinationMessageAnnotations.Raw).To(Equal([]byte(`{"key":"a-property"}`)))
Expect(fetched.Spec.DestinationQueue).To(Equal("a-queue"))
Expect(fetched.Spec.PrefetchCount).To(Equal(10))
Expect(fetched.Spec.ReconnectDelay).To(Equal(10))
Expand All @@ -113,6 +117,7 @@ var _ = Describe("Shovel spec", func() {
Expect(fetched.Spec.SourcePrefetchCount).To(Equal(10))
Expect(fetched.Spec.SourceProtocol).To(Equal("amqp091"))
Expect(fetched.Spec.SourceQueue).To(Equal("a-queue"))
Expect(fetched.Spec.SourceConsumerArgs.Raw).To(Equal([]byte(`{"arg":"arg-value"}`)))
})

When("creating a shovel with an invalid 'AckMode' value", func() {
Expand All @@ -136,4 +141,48 @@ var _ = Describe("Shovel spec", func() {
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-ackmode" is invalid: spec.ackMode: Unsupported value: "an-invalid-ackmode": supported values: "on-confirm", "on-publish", "no-ack"`))
})
})

When("creating a shovel with unsupported protocol", func() {
It("fails with validation errors", func() {
shovel := Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "an-invalid-destprotocol",
Namespace: namespace,
},
Spec: ShovelSpec{
Name: "an-invalid-destprotocol",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
SourceProtocol: "amqp091",
DestinationProtocol: "stomp",
}}
Expect(k8sClient.Create(ctx, &shovel)).To(HaveOccurred())
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-destprotocol" is invalid: spec.destProtocol: Unsupported value: "stomp": supported values: "amqp091", "amqp10"`))
})

It("fails with validation errors", func() {
shovel := Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "an-invalid-srcprotocol",
Namespace: namespace,
},
Spec: ShovelSpec{
Name: "an-invalid-srcprotocol",
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "some-cluster",
},
UriSecret: &corev1.LocalObjectReference{
Name: "a-secret",
},
SourceProtocol: "mqtt",
DestinationProtocol: "amqp10",
}}
Expect(k8sClient.Create(ctx, &shovel)).To(HaveOccurred())
Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-srcprotocol" is invalid: spec.srcProtocol: Unsupported value: "mqtt": supported values: "amqp091", "amqp10"`))
})
})
})
22 changes: 22 additions & 0 deletions api/v1beta1/shovel_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var _ webhook.Validator = &Shovel{}
// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
// either rabbitmqClusterReference.name or rabbitmqClusterReference.connectionSecret must be provided but not both
func (s *Shovel) ValidateCreate() error {
if err := s.amqp10Validate(); err != nil {
return err
}
return s.Spec.RabbitmqClusterReference.ValidateOnCreate(s.GroupResource(), s.Name)
}

Expand All @@ -32,6 +35,10 @@ func (s *Shovel) ValidateUpdate(old runtime.Object) error {
return apierrors.NewBadRequest(fmt.Sprintf("expected a shovel but got a %T", oldShovel))
}

if err := s.amqp10Validate(); err != nil {
return err
}

detailMsg := "updates on name, vhost and rabbitmqClusterReference are all forbidden"
if s.Spec.Name != oldShovel.Spec.Name {
return apierrors.NewForbidden(s.GroupResource(), s.Name,
Expand All @@ -53,3 +60,18 @@ func (s *Shovel) ValidateUpdate(old runtime.Object) error {
func (s *Shovel) ValidateDelete() error {
return nil
}

func (s *Shovel) amqp10Validate() error {
var errorList field.ErrorList
if s.Spec.SourceProtocol == "amqp10" && s.Spec.SourceAddress == "" {
errorList = append(errorList, field.Required(field.NewPath("spec", "srcAddress"),
"must specify spec.srcAddress when spec.srcProtocol is amqp10"))
return apierrors.NewInvalid(GroupVersion.WithKind("Shovel").GroupKind(), s.Name, errorList)
}
if s.Spec.DestinationProtocol == "amqp10" && s.Spec.DestinationAddress == "" {
errorList = append(errorList, field.Required(field.NewPath("spec", "destAddress"),
"must specify spec.destAddress when spec.destProtocol is amqp10"))
return apierrors.NewInvalid(GroupVersion.WithKind("Shovel").GroupKind(), s.Name, errorList)
}
return nil
}
56 changes: 50 additions & 6 deletions api/v1beta1/shovel_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)

var _ = Describe("shovel webhook", func() {
Expand All @@ -25,12 +26,13 @@ var _ = Describe("shovel webhook", func() {
DestinationAddForwardHeaders: true,
DestinationAddTimestampHeader: true,
DestinationAddress: "myQueue",
DestinationApplicationProperties: "a-property",
DestinationApplicationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
DestinationExchange: "an-exchange",
DestinationExchangeKey: "a-key",
DestinationProperties: "a-property",
DestinationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
DestinationProtocol: "amqp091",
DestinationPublishProperties: "a-property",
DestinationPublishProperties: &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 1}`)},
DestinationMessageAnnotations: &runtime.RawExtension{Raw: []byte(`{"a-key": "an-annotation"}`)},
DestinationQueue: "a-queue",
PrefetchCount: 10,
ReconnectDelay: 10,
Expand All @@ -41,6 +43,7 @@ var _ = Describe("shovel webhook", func() {
SourcePrefetchCount: 10,
SourceProtocol: "amqp091",
SourceQueue: "a-queue",
SourceConsumerArgs: &runtime.RawExtension{Raw: []byte(`{"x-priority": 1}`)},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "a-cluster",
},
Expand All @@ -60,6 +63,20 @@ var _ = Describe("shovel webhook", func() {
notAllowed.Spec.RabbitmqClusterReference.ConnectionSecret = nil
Expect(apierrors.IsForbidden(notAllowed.ValidateCreate())).To(BeTrue())
})

It("spec.srcAddress must be set if spec.srcProtocol is amqp10", func() {
notValid := shovel.DeepCopy()
notValid.Spec.SourceProtocol = "amqp10"
notValid.Spec.SourceAddress = ""
Expect(apierrors.IsInvalid(notValid.ValidateCreate())).To(BeTrue())
})

It("spec.destAddress must be set if spec.destProtocol is amqp10", func() {
notValid := shovel.DeepCopy()
notValid.Spec.DestinationProtocol = "amqp10"
notValid.Spec.DestinationAddress = ""
Expect(apierrors.IsInvalid(notValid.ValidateCreate())).To(BeTrue())
})
})

Context("ValidateUpdate", func() {
Expand All @@ -83,6 +100,20 @@ var _ = Describe("shovel webhook", func() {
Expect(apierrors.IsForbidden(newShovel.ValidateUpdate(&shovel))).To(BeTrue())
})

It("spec.srcAddress must be set if spec.srcProtocol is amqp10", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.SourceProtocol = "amqp10"
newShovel.Spec.SourceAddress = ""
Expect(apierrors.IsInvalid(newShovel.ValidateUpdate(&shovel))).To(BeTrue())
})

It("spec.destAddress must be set if spec.destProtocol is amqp10", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationProtocol = "amqp10"
newShovel.Spec.DestinationAddress = ""
Expect(apierrors.IsInvalid(newShovel.ValidateUpdate(&shovel))).To(BeTrue())
})

It("does not allow updates on rabbitmqClusterReference.connectionSecret", func() {
connectionScr := Shovel{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -156,7 +187,7 @@ var _ = Describe("shovel webhook", func() {

It("allows updates on DestinationApplicationProperties", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationApplicationProperties = "new-property"
newShovel.Spec.DestinationApplicationProperties = &runtime.RawExtension{Raw: []byte(`{"key": "new"}`)}
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

Expand All @@ -174,9 +205,10 @@ var _ = Describe("shovel webhook", func() {

It("allows updates on DestinationProperties", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationProperties = "new"
newShovel.Spec.DestinationProperties = &runtime.RawExtension{Raw: []byte(`{"key": "new"}`)}
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

It("allows updates on DestinationProtocol", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationProtocol = "new"
Expand All @@ -185,7 +217,13 @@ var _ = Describe("shovel webhook", func() {

It("allows updates on DestinationPublishProperties", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationPublishProperties = "new-property"
newShovel.Spec.DestinationPublishProperties = &runtime.RawExtension{Raw: []byte(`{"key": "new"}`)}
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

It("allows updates on DestinationMessageAnnotations", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationMessageAnnotations = &runtime.RawExtension{Raw: []byte(`{"key": "new-annotation"}`)}
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

Expand Down Expand Up @@ -248,5 +286,11 @@ var _ = Describe("shovel webhook", func() {
newShovel.Spec.SourceQueue = "another-queue"
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

It("allows updates on SourceConsumerArgs", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.SourceConsumerArgs = &runtime.RawExtension{Raw: []byte(`{"x-priority": 10}`)}
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})
})
})
25 changes: 25 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions config/crd/bases/rabbitmq.com_bindings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: bindings.rabbitmq.com
spec:
group: rabbitmq.com
Expand Down
3 changes: 1 addition & 2 deletions config/crd/bases/rabbitmq.com_exchanges.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.11.3
creationTimestamp: null
controller-gen.kubebuilder.io/version: v0.11.4
name: exchanges.rabbitmq.com
spec:
group: rabbitmq.com
Expand Down
Loading

0 comments on commit 6488550

Please sign in to comment.