diff --git a/api/v1beta1/shovel_types.go b/api/v1beta1/shovel_types.go index c5a5ee3a..c61d2ec2 100644 --- a/api/v1beta1/shovel_types.go +++ b/api/v1beta1/shovel_types.go @@ -3,6 +3,7 @@ package v1beta1 import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -26,28 +27,42 @@ type ShovelSpec struct { // +kubebuilder:validation:Required UriSecret *corev1.LocalObjectReference `json:"uriSecret"` // +kubebuilder:validation:Enum=on-confirm;on-publish;no-ack - AckMode string `json:"ackMode,omitempty"` - AddForwardHeaders bool `json:"addForwardHeaders,omitempty"` - DeleteAfter string `json:"deleteAfter,omitempty"` - DestinationAddForwardHeaders bool `json:"destAddForwardHeaders,omitempty"` - DestinationAddTimestampHeader bool `json:"destAddTimestampHeader,omitempty"` - DestinationAddress string `json:"destAddress,omitempty"` - DestinationApplicationProperties string `json:"destApplicationProperties,omitempty"` - DestinationExchange string `json:"destExchange,omitempty"` - DestinationExchangeKey string `json:"destExchangeKey,omitempty"` - DestinationProperties string `json:"destProperties,omitempty"` - DestinationProtocol string `json:"destProtocol,omitempty"` - DestinationPublishProperties string `json:"destPublishProperties,omitempty"` - DestinationQueue string `json:"destQueue,omitempty"` - PrefetchCount int `json:"prefetchCount,omitempty"` - ReconnectDelay int `json:"reconnectDelay,omitempty"` - SourceAddress string `json:"srcAddress,omitempty"` - SourceDeleteAfter string `json:"srcDeleteAfter,omitempty"` - SourceExchange string `json:"srcExchange,omitempty"` - SourceExchangeKey string `json:"srcExchangeKey,omitempty"` - SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"` - SourceProtocol string `json:"srcProtocol,omitempty"` - SourceQueue string `json:"srcQueue,omitempty"` + AckMode string `json:"ackMode,omitempty"` + AddForwardHeaders bool `json:"addForwardHeaders,omitempty"` + DeleteAfter string `json:"deleteAfter,omitempty"` + DestinationAddForwardHeaders bool `json:"destAddForwardHeaders,omitempty"` + DestinationAddTimestampHeader bool `json:"destAddTimestampHeader,omitempty"` + DestinationAddress string `json:"destAddress,omitempty"` + DestinationExchange string `json:"destExchange,omitempty"` + DestinationExchangeKey string `json:"destExchangeKey,omitempty"` + // +kubebuilder:validation:Enum=amqp091;amqp10 + DestinationProtocol string `json:"destProtocol,omitempty"` + // +kubebuilder:validation:Type=object + // +kubebuilder:pruning:PreserveUnknownFields + DestinationApplicationProperties *runtime.RawExtension `json:"destApplicationProperties,omitempty"` + // +kubebuilder:validation:Type=object + // +kubebuilder:pruning:PreserveUnknownFields + DestinationProperties *runtime.RawExtension `json:"destProperties,omitempty"` + // +kubebuilder:validation:Type=object + // +kubebuilder:pruning:PreserveUnknownFields + DestinationPublishProperties *runtime.RawExtension `json:"destPublishProperties,omitempty"` + // +kubebuilder:validation:Type=object + // +kubebuilder:pruning:PreserveUnknownFields + DestinationMessageAnnotations *runtime.RawExtension `json:"destMessageAnnotations,omitempty"` + DestinationQueue string `json:"destQueue,omitempty"` + PrefetchCount int `json:"prefetchCount,omitempty"` + ReconnectDelay int `json:"reconnectDelay,omitempty"` + SourceAddress string `json:"srcAddress,omitempty"` + SourceDeleteAfter string `json:"srcDeleteAfter,omitempty"` + SourceExchange string `json:"srcExchange,omitempty"` + SourceExchangeKey string `json:"srcExchangeKey,omitempty"` + SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"` + // +kubebuilder:validation:Enum=amqp091;amqp10 + SourceProtocol string `json:"srcProtocol,omitempty"` + SourceQueue string `json:"srcQueue,omitempty"` + // +kubebuilder:validation:Type=object + // +kubebuilder:pruning:PreserveUnknownFields + SourceConsumerArgs *runtime.RawExtension `json:"srcConsumerArgs,omitempty"` } // ShovelStatus defines the observed state of Shovel diff --git a/api/v1beta1/shovel_types_test.go b/api/v1beta1/shovel_types_test.go index 5b55c67f..47732689 100644 --- a/api/v1beta1/shovel_types_test.go +++ b/api/v1beta1/shovel_types_test.go @@ -6,6 +6,7 @@ import ( . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" ) @@ -63,12 +64,13 @@ var _ = Describe("Shovel spec", func() { DestinationAddForwardHeaders: true, DestinationAddTimestampHeader: true, DestinationAddress: "myQueue", - DestinationApplicationProperties: "a-property", + DestinationApplicationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)}, + DestinationMessageAnnotations: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)}, DestinationExchange: "an-exchange", DestinationExchangeKey: "a-key", - DestinationProperties: "a-property", + DestinationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)}, DestinationProtocol: "amqp091", - DestinationPublishProperties: "a-property", + DestinationPublishProperties: &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 1}`)}, DestinationQueue: "a-queue", PrefetchCount: 10, ReconnectDelay: 10, @@ -79,6 +81,7 @@ var _ = Describe("Shovel spec", func() { SourcePrefetchCount: 10, SourceProtocol: "amqp091", SourceQueue: "a-queue", + SourceConsumerArgs: &runtime.RawExtension{Raw: []byte(`{"arg": "arg-value"}`)}, }} Expect(k8sClient.Create(ctx, &shovel)).To(Succeed()) fetched := &Shovel{} @@ -98,10 +101,11 @@ var _ = Describe("Shovel spec", func() { Expect(fetched.Spec.DestinationAddTimestampHeader).To(BeTrue()) Expect(fetched.Spec.DestinationAddForwardHeaders).To(BeTrue()) Expect(fetched.Spec.DestinationAddress).To(Equal("myQueue")) - Expect(fetched.Spec.DestinationApplicationProperties).To(Equal("a-property")) + Expect(fetched.Spec.DestinationApplicationProperties.Raw).To(Equal([]byte(`{"key":"a-property"}`))) Expect(fetched.Spec.DestinationExchange).To(Equal("an-exchange")) Expect(fetched.Spec.DestinationExchangeKey).To(Equal("a-key")) - Expect(fetched.Spec.DestinationProperties).To(Equal("a-property")) + Expect(fetched.Spec.DestinationProperties.Raw).To(Equal([]byte(`{"key":"a-property"}`))) + Expect(fetched.Spec.DestinationMessageAnnotations.Raw).To(Equal([]byte(`{"key":"a-property"}`))) Expect(fetched.Spec.DestinationQueue).To(Equal("a-queue")) Expect(fetched.Spec.PrefetchCount).To(Equal(10)) Expect(fetched.Spec.ReconnectDelay).To(Equal(10)) @@ -113,6 +117,7 @@ var _ = Describe("Shovel spec", func() { Expect(fetched.Spec.SourcePrefetchCount).To(Equal(10)) Expect(fetched.Spec.SourceProtocol).To(Equal("amqp091")) Expect(fetched.Spec.SourceQueue).To(Equal("a-queue")) + Expect(fetched.Spec.SourceConsumerArgs.Raw).To(Equal([]byte(`{"arg":"arg-value"}`))) }) When("creating a shovel with an invalid 'AckMode' value", func() { @@ -136,4 +141,48 @@ var _ = Describe("Shovel spec", func() { Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-ackmode" is invalid: spec.ackMode: Unsupported value: "an-invalid-ackmode": supported values: "on-confirm", "on-publish", "no-ack"`)) }) }) + + When("creating a shovel with unsupported protocol", func() { + It("fails with validation errors", func() { + shovel := Shovel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "an-invalid-destprotocol", + Namespace: namespace, + }, + Spec: ShovelSpec{ + Name: "an-invalid-destprotocol", + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "some-cluster", + }, + UriSecret: &corev1.LocalObjectReference{ + Name: "a-secret", + }, + SourceProtocol: "amqp091", + DestinationProtocol: "stomp", + }} + Expect(k8sClient.Create(ctx, &shovel)).To(HaveOccurred()) + Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-destprotocol" is invalid: spec.destProtocol: Unsupported value: "stomp": supported values: "amqp091", "amqp10"`)) + }) + + It("fails with validation errors", func() { + shovel := Shovel{ + ObjectMeta: metav1.ObjectMeta{ + Name: "an-invalid-srcprotocol", + Namespace: namespace, + }, + Spec: ShovelSpec{ + Name: "an-invalid-srcprotocol", + RabbitmqClusterReference: RabbitmqClusterReference{ + Name: "some-cluster", + }, + UriSecret: &corev1.LocalObjectReference{ + Name: "a-secret", + }, + SourceProtocol: "mqtt", + DestinationProtocol: "amqp10", + }} + Expect(k8sClient.Create(ctx, &shovel)).To(HaveOccurred()) + Expect(k8sClient.Create(ctx, &shovel)).To(MatchError(`Shovel.rabbitmq.com "an-invalid-srcprotocol" is invalid: spec.srcProtocol: Unsupported value: "mqtt": supported values: "amqp091", "amqp10"`)) + }) + }) }) diff --git a/api/v1beta1/shovel_webhook.go b/api/v1beta1/shovel_webhook.go index 52581629..c55af6fe 100644 --- a/api/v1beta1/shovel_webhook.go +++ b/api/v1beta1/shovel_webhook.go @@ -22,6 +22,9 @@ var _ webhook.Validator = &Shovel{} // ValidateCreate implements webhook.Validator so a webhook will be registered for the type // either rabbitmqClusterReference.name or rabbitmqClusterReference.connectionSecret must be provided but not both func (s *Shovel) ValidateCreate() error { + if err := s.amqp10Validate(); err != nil { + return err + } return s.Spec.RabbitmqClusterReference.ValidateOnCreate(s.GroupResource(), s.Name) } @@ -32,6 +35,10 @@ func (s *Shovel) ValidateUpdate(old runtime.Object) error { return apierrors.NewBadRequest(fmt.Sprintf("expected a shovel but got a %T", oldShovel)) } + if err := s.amqp10Validate(); err != nil { + return err + } + detailMsg := "updates on name, vhost and rabbitmqClusterReference are all forbidden" if s.Spec.Name != oldShovel.Spec.Name { return apierrors.NewForbidden(s.GroupResource(), s.Name, @@ -53,3 +60,18 @@ func (s *Shovel) ValidateUpdate(old runtime.Object) error { func (s *Shovel) ValidateDelete() error { return nil } + +func (s *Shovel) amqp10Validate() error { + var errorList field.ErrorList + if s.Spec.SourceProtocol == "amqp10" && s.Spec.SourceAddress == "" { + errorList = append(errorList, field.Required(field.NewPath("spec", "srcAddress"), + "must specify spec.srcAddress when spec.srcProtocol is amqp10")) + return apierrors.NewInvalid(GroupVersion.WithKind("Shovel").GroupKind(), s.Name, errorList) + } + if s.Spec.DestinationProtocol == "amqp10" && s.Spec.DestinationAddress == "" { + errorList = append(errorList, field.Required(field.NewPath("spec", "destAddress"), + "must specify spec.destAddress when spec.destProtocol is amqp10")) + return apierrors.NewInvalid(GroupVersion.WithKind("Shovel").GroupKind(), s.Name, errorList) + } + return nil +} diff --git a/api/v1beta1/shovel_webhook_test.go b/api/v1beta1/shovel_webhook_test.go index 02928b79..2f2508ab 100644 --- a/api/v1beta1/shovel_webhook_test.go +++ b/api/v1beta1/shovel_webhook_test.go @@ -6,6 +6,7 @@ import ( corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ) var _ = Describe("shovel webhook", func() { @@ -25,12 +26,13 @@ var _ = Describe("shovel webhook", func() { DestinationAddForwardHeaders: true, DestinationAddTimestampHeader: true, DestinationAddress: "myQueue", - DestinationApplicationProperties: "a-property", + DestinationApplicationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)}, DestinationExchange: "an-exchange", DestinationExchangeKey: "a-key", - DestinationProperties: "a-property", + DestinationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)}, DestinationProtocol: "amqp091", - DestinationPublishProperties: "a-property", + DestinationPublishProperties: &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 1}`)}, + DestinationMessageAnnotations: &runtime.RawExtension{Raw: []byte(`{"a-key": "an-annotation"}`)}, DestinationQueue: "a-queue", PrefetchCount: 10, ReconnectDelay: 10, @@ -41,6 +43,7 @@ var _ = Describe("shovel webhook", func() { SourcePrefetchCount: 10, SourceProtocol: "amqp091", SourceQueue: "a-queue", + SourceConsumerArgs: &runtime.RawExtension{Raw: []byte(`{"x-priority": 1}`)}, RabbitmqClusterReference: RabbitmqClusterReference{ Name: "a-cluster", }, @@ -60,6 +63,20 @@ var _ = Describe("shovel webhook", func() { notAllowed.Spec.RabbitmqClusterReference.ConnectionSecret = nil Expect(apierrors.IsForbidden(notAllowed.ValidateCreate())).To(BeTrue()) }) + + It("spec.srcAddress must be set if spec.srcProtocol is amqp10", func() { + notValid := shovel.DeepCopy() + notValid.Spec.SourceProtocol = "amqp10" + notValid.Spec.SourceAddress = "" + Expect(apierrors.IsInvalid(notValid.ValidateCreate())).To(BeTrue()) + }) + + It("spec.destAddress must be set if spec.destProtocol is amqp10", func() { + notValid := shovel.DeepCopy() + notValid.Spec.DestinationProtocol = "amqp10" + notValid.Spec.DestinationAddress = "" + Expect(apierrors.IsInvalid(notValid.ValidateCreate())).To(BeTrue()) + }) }) Context("ValidateUpdate", func() { @@ -83,6 +100,20 @@ var _ = Describe("shovel webhook", func() { Expect(apierrors.IsForbidden(newShovel.ValidateUpdate(&shovel))).To(BeTrue()) }) + It("spec.srcAddress must be set if spec.srcProtocol is amqp10", func() { + newShovel := shovel.DeepCopy() + newShovel.Spec.SourceProtocol = "amqp10" + newShovel.Spec.SourceAddress = "" + Expect(apierrors.IsInvalid(newShovel.ValidateUpdate(&shovel))).To(BeTrue()) + }) + + It("spec.destAddress must be set if spec.destProtocol is amqp10", func() { + newShovel := shovel.DeepCopy() + newShovel.Spec.DestinationProtocol = "amqp10" + newShovel.Spec.DestinationAddress = "" + Expect(apierrors.IsInvalid(newShovel.ValidateUpdate(&shovel))).To(BeTrue()) + }) + It("does not allow updates on rabbitmqClusterReference.connectionSecret", func() { connectionScr := Shovel{ ObjectMeta: metav1.ObjectMeta{ @@ -156,7 +187,7 @@ var _ = Describe("shovel webhook", func() { It("allows updates on DestinationApplicationProperties", func() { newShovel := shovel.DeepCopy() - newShovel.Spec.DestinationApplicationProperties = "new-property" + newShovel.Spec.DestinationApplicationProperties = &runtime.RawExtension{Raw: []byte(`{"key": "new"}`)} Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed()) }) @@ -174,9 +205,10 @@ var _ = Describe("shovel webhook", func() { It("allows updates on DestinationProperties", func() { newShovel := shovel.DeepCopy() - newShovel.Spec.DestinationProperties = "new" + newShovel.Spec.DestinationProperties = &runtime.RawExtension{Raw: []byte(`{"key": "new"}`)} Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed()) }) + It("allows updates on DestinationProtocol", func() { newShovel := shovel.DeepCopy() newShovel.Spec.DestinationProtocol = "new" @@ -185,7 +217,13 @@ var _ = Describe("shovel webhook", func() { It("allows updates on DestinationPublishProperties", func() { newShovel := shovel.DeepCopy() - newShovel.Spec.DestinationPublishProperties = "new-property" + newShovel.Spec.DestinationPublishProperties = &runtime.RawExtension{Raw: []byte(`{"key": "new"}`)} + Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed()) + }) + + It("allows updates on DestinationMessageAnnotations", func() { + newShovel := shovel.DeepCopy() + newShovel.Spec.DestinationMessageAnnotations = &runtime.RawExtension{Raw: []byte(`{"key": "new-annotation"}`)} Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed()) }) @@ -248,5 +286,11 @@ var _ = Describe("shovel webhook", func() { newShovel.Spec.SourceQueue = "another-queue" Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed()) }) + + It("allows updates on SourceConsumerArgs", func() { + newShovel := shovel.DeepCopy() + newShovel.Spec.SourceConsumerArgs = &runtime.RawExtension{Raw: []byte(`{"x-priority": 10}`)} + Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed()) + }) }) }) diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 493f8367..0cbba484 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -859,6 +859,31 @@ func (in *ShovelSpec) DeepCopyInto(out *ShovelSpec) { *out = new(v1.LocalObjectReference) **out = **in } + if in.DestinationApplicationProperties != nil { + in, out := &in.DestinationApplicationProperties, &out.DestinationApplicationProperties + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.DestinationProperties != nil { + in, out := &in.DestinationProperties, &out.DestinationProperties + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.DestinationPublishProperties != nil { + in, out := &in.DestinationPublishProperties, &out.DestinationPublishProperties + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.DestinationMessageAnnotations != nil { + in, out := &in.DestinationMessageAnnotations, &out.DestinationMessageAnnotations + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.SourceConsumerArgs != nil { + in, out := &in.SourceConsumerArgs, &out.SourceConsumerArgs + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ShovelSpec. diff --git a/config/crd/bases/rabbitmq.com_bindings.yaml b/config/crd/bases/rabbitmq.com_bindings.yaml index 40306d8d..1ff3b391 100644 --- a/config/crd/bases/rabbitmq.com_bindings.yaml +++ b/config/crd/bases/rabbitmq.com_bindings.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: bindings.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_exchanges.yaml b/config/crd/bases/rabbitmq.com_exchanges.yaml index dcaf5316..1a9641fb 100644 --- a/config/crd/bases/rabbitmq.com_exchanges.yaml +++ b/config/crd/bases/rabbitmq.com_exchanges.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: exchanges.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_federations.yaml b/config/crd/bases/rabbitmq.com_federations.yaml index 96c98eaf..06118c85 100644 --- a/config/crd/bases/rabbitmq.com_federations.yaml +++ b/config/crd/bases/rabbitmq.com_federations.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: federations.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_permissions.yaml b/config/crd/bases/rabbitmq.com_permissions.yaml index fd2a1d57..4e753550 100644 --- a/config/crd/bases/rabbitmq.com_permissions.yaml +++ b/config/crd/bases/rabbitmq.com_permissions.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: permissions.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_policies.yaml b/config/crd/bases/rabbitmq.com_policies.yaml index 8df833f2..f0030d52 100644 --- a/config/crd/bases/rabbitmq.com_policies.yaml +++ b/config/crd/bases/rabbitmq.com_policies.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: policies.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_queues.yaml b/config/crd/bases/rabbitmq.com_queues.yaml index bb275f90..20279289 100644 --- a/config/crd/bases/rabbitmq.com_queues.yaml +++ b/config/crd/bases/rabbitmq.com_queues.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: queues.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_schemareplications.yaml b/config/crd/bases/rabbitmq.com_schemareplications.yaml index 3a9adc69..3ee54ea3 100644 --- a/config/crd/bases/rabbitmq.com_schemareplications.yaml +++ b/config/crd/bases/rabbitmq.com_schemareplications.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: schemareplications.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_shovels.yaml b/config/crd/bases/rabbitmq.com_shovels.yaml index 21c05e27..4b6f409e 100644 --- a/config/crd/bases/rabbitmq.com_shovels.yaml +++ b/config/crd/bases/rabbitmq.com_shovels.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: shovels.rabbitmq.com spec: group: rabbitmq.com @@ -56,17 +55,26 @@ spec: destAddress: type: string destApplicationProperties: - type: string + type: object + x-kubernetes-preserve-unknown-fields: true destExchange: type: string destExchangeKey: type: string + destMessageAnnotations: + type: object + x-kubernetes-preserve-unknown-fields: true destProperties: - type: string + type: object + x-kubernetes-preserve-unknown-fields: true destProtocol: + enum: + - amqp091 + - amqp10 type: string destPublishProperties: - type: string + type: object + x-kubernetes-preserve-unknown-fields: true destQueue: type: string name: @@ -103,6 +111,9 @@ spec: type: integer srcAddress: type: string + srcConsumerArgs: + type: object + x-kubernetes-preserve-unknown-fields: true srcDeleteAfter: type: string srcExchange: @@ -112,6 +123,9 @@ spec: srcPrefetchCount: type: integer srcProtocol: + enum: + - amqp091 + - amqp10 type: string srcQueue: type: string diff --git a/config/crd/bases/rabbitmq.com_superstreams.yaml b/config/crd/bases/rabbitmq.com_superstreams.yaml index c94e9ee2..825a66b3 100644 --- a/config/crd/bases/rabbitmq.com_superstreams.yaml +++ b/config/crd/bases/rabbitmq.com_superstreams.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: superstreams.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_topicpermissions.yaml b/config/crd/bases/rabbitmq.com_topicpermissions.yaml index 161ac1f9..77c17f9b 100644 --- a/config/crd/bases/rabbitmq.com_topicpermissions.yaml +++ b/config/crd/bases/rabbitmq.com_topicpermissions.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: topicpermissions.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_users.yaml b/config/crd/bases/rabbitmq.com_users.yaml index cc8c06d5..f69e2031 100644 --- a/config/crd/bases/rabbitmq.com_users.yaml +++ b/config/crd/bases/rabbitmq.com_users.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: users.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/crd/bases/rabbitmq.com_vhosts.yaml b/config/crd/bases/rabbitmq.com_vhosts.yaml index 9fe9ea73..cb70012b 100644 --- a/config/crd/bases/rabbitmq.com_vhosts.yaml +++ b/config/crd/bases/rabbitmq.com_vhosts.yaml @@ -3,8 +3,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: annotations: - controller-gen.kubebuilder.io/version: v0.11.3 - creationTimestamp: null + controller-gen.kubebuilder.io/version: v0.11.4 name: vhosts.rabbitmq.com spec: group: rabbitmq.com diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 81836ea1..6afac7d6 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -2,7 +2,6 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - creationTimestamp: null name: manager-role rules: - apiGroups: diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 3b91b44f..8c4c287e 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -2,7 +2,6 @@ apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration metadata: - creationTimestamp: null name: validating-webhook-configuration webhooks: - admissionReviewVersions: diff --git a/controllers/shovel_controller.go b/controllers/shovel_controller.go index 48e702bd..400a856d 100644 --- a/controllers/shovel_controller.go +++ b/controllers/shovel_controller.go @@ -29,7 +29,11 @@ func (r *ShovelReconciler) DeclareFunc(ctx context.Context, client rabbitmqclien if err != nil { return fmt.Errorf("failed to parse shovel uri secret; secret name: %s, error: %w", shovel.Spec.UriSecret.Name, err) } - return validateResponse(client.DeclareShovel(shovel.Spec.Vhost, shovel.Spec.Name, internal.GenerateShovelDefinition(shovel, srcUri, destUri))) + definition, err := internal.GenerateShovelDefinition(shovel, srcUri, destUri) + if err != nil { + return fmt.Errorf("failed to generate shovel definition: %w", err) + } + return validateResponse(client.DeclareShovel(shovel.Spec.Vhost, shovel.Spec.Name, *definition)) } func (r *ShovelReconciler) getUris(ctx context.Context, shovel *topology.Shovel) (string, string, error) { if shovel.Spec.UriSecret == nil { diff --git a/docs/api/rabbitmq.com.ref.asciidoc b/docs/api/rabbitmq.com.ref.asciidoc index 6d1f68e8..83fbddb0 100644 --- a/docs/api/rabbitmq.com.ref.asciidoc +++ b/docs/api/rabbitmq.com.ref.asciidoc @@ -36,7 +36,8 @@ SuperStream is the Schema for the queues API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1alpha1` | *`kind`* __string__ | `SuperStream` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha1-superstreamspec[$$SuperStreamSpec$$]__ | @@ -56,7 +57,8 @@ SuperStreamList contains a list of SuperStreams | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1alpha1` | *`kind`* __string__ | `SuperStreamList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1alpha1-superstream[$$SuperStream$$] array__ | @@ -151,7 +153,8 @@ Binding is the Schema for the bindings API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `Binding` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-bindingspec[$$BindingSpec$$]__ | @@ -171,7 +174,8 @@ BindingList contains a list of Binding | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `BindingList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-binding[$$Binding$$] array__ | @@ -278,7 +282,8 @@ Exchange is the Schema for the exchanges API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `Exchange` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-exchangespec[$$ExchangeSpec$$]__ | @@ -298,7 +303,8 @@ ExchangeList contains a list of Exchange | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `ExchangeList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-exchange[$$Exchange$$] array__ | @@ -361,7 +367,8 @@ Federation is the Schema for the federations API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `Federation` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-federationspec[$$FederationSpec$$]__ | @@ -381,7 +388,8 @@ FederationList contains a list of Federation | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `FederationList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-federation[$$Federation$$] array__ | @@ -450,7 +458,8 @@ Permission is the Schema for the permissions API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `Permission` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-permissionspec[$$PermissionSpec$$]__ | @@ -470,7 +479,8 @@ PermissionList contains a list of Permission | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `PermissionList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-permission[$$Permission$$] array__ | @@ -531,7 +541,8 @@ Policy is the Schema for the policies API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `Policy` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-policyspec[$$PolicySpec$$]__ | @@ -551,7 +562,8 @@ PolicyList contains a list of Policy | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `PolicyList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-policy[$$Policy$$] array__ | @@ -614,7 +626,8 @@ Queue is the Schema for the queues API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `Queue` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-queuespec[$$QueueSpec$$]__ | @@ -634,7 +647,8 @@ QueueList contains a list of Queue | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `QueueList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-queue[$$Queue$$] array__ | @@ -727,7 +741,8 @@ SchemaReplication is the Schema for the schemareplications API This feature requ | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `SchemaReplication` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-schemareplicationspec[$$SchemaReplicationSpec$$]__ | @@ -747,7 +762,8 @@ SchemaReplicationList contains a list of SchemaReplication | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `SchemaReplicationList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-schemareplication[$$SchemaReplication$$] array__ | @@ -824,7 +840,8 @@ Shovel is the Schema for the shovels API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `Shovel` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-shovelspec[$$ShovelSpec$$]__ | @@ -844,7 +861,8 @@ ShovelList contains a list of Shovel | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `ShovelList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-shovel[$$Shovel$$] array__ | @@ -874,12 +892,13 @@ ShovelSpec defines the desired state of Shovel For how to configure Shovel, see: | *`destAddForwardHeaders`* __boolean__ | | *`destAddTimestampHeader`* __boolean__ | | *`destAddress`* __string__ | -| *`destApplicationProperties`* __string__ | | *`destExchange`* __string__ | | *`destExchangeKey`* __string__ | -| *`destProperties`* __string__ | | *`destProtocol`* __string__ | -| *`destPublishProperties`* __string__ | +| *`destApplicationProperties`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ | +| *`destProperties`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ | +| *`destPublishProperties`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ | +| *`destMessageAnnotations`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ | | *`destQueue`* __string__ | | *`prefetchCount`* __integer__ | | *`reconnectDelay`* __integer__ | @@ -890,6 +909,7 @@ ShovelSpec defines the desired state of Shovel For how to configure Shovel, see: | *`srcPrefetchCount`* __integer__ | | *`srcProtocol`* __string__ | | *`srcQueue`* __string__ | +| *`srcConsumerArgs`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ | |=== @@ -926,7 +946,8 @@ TopicPermission is the Schema for the topicpermissions API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `TopicPermission` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-topicpermissionspec[$$TopicPermissionSpec$$]__ | @@ -965,7 +986,8 @@ TopicPermissionList contains a list of TopicPermission | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `TopicPermissionList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-topicpermission[$$TopicPermission$$] array__ | @@ -1028,7 +1050,8 @@ User is the Schema for the users API. | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `User` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-userspec[$$UserSpec$$]__ | Spec configures the desired state of the User object. @@ -1048,7 +1071,8 @@ UserList contains a list of Users. | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `UserList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-user[$$User$$] array__ | @@ -1138,7 +1162,8 @@ Vhost is the Schema for the vhosts API | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `Vhost` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#objectmeta-v1-meta[$$ObjectMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`spec`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-vhostspec[$$VhostSpec$$]__ | @@ -1158,7 +1183,8 @@ VhostList contains a list of Vhost | Field | Description | *`apiVersion`* __string__ | `rabbitmq.com/v1beta1` | *`kind`* __string__ | `VhostList` -| *`TypeMeta`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#typemeta-v1-meta[$$TypeMeta$$]__ | +| *`kind`* __string__ | Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds +| *`apiVersion`* __string__ | APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources | *`metadata`* __link:https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#listmeta-v1-meta[$$ListMeta$$]__ | Refer to Kubernetes API documentation for fields of `metadata`. | *`items`* __xref:{anchor_prefix}-github.aaakk.us.kg-rabbitmq-messaging-topology-operator-api-v1beta1-vhost[$$Vhost$$] array__ | diff --git a/go.mod b/go.mod index d2e3ea43..d4e2d3a2 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/google/uuid v1.3.0 github.com/hashicorp/vault/api v1.9.1 github.com/maxbrunsfeld/counterfeiter/v6 v6.6.1 - github.com/michaelklishin/rabbit-hole/v2 v2.13.0 + github.com/michaelklishin/rabbit-hole/v2 v2.15.0 github.com/onsi/ginkgo/v2 v2.9.5 github.com/onsi/gomega v1.27.7 github.com/rabbitmq/cluster-operator v1.14.0 diff --git a/go.sum b/go.sum index 2a1c713e..5f873620 100644 --- a/go.sum +++ b/go.sum @@ -312,8 +312,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.2 h1:hAHbPm5IJGijwng3PWk09 github.com/matttproud/golang_protobuf_extensions v1.0.2/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/maxbrunsfeld/counterfeiter/v6 v6.6.1 h1:9XE5ykDiC8eNSqIPkxx0EsV3kMX1oe4kQWRZjIgytUA= github.com/maxbrunsfeld/counterfeiter/v6 v6.6.1/go.mod h1:qbKwBR+qQODzH2WD/s53mdgp/xVcXMlJb59GRFOp6Z4= -github.com/michaelklishin/rabbit-hole/v2 v2.13.0 h1:4idonhoyXqm47o+A8Sf8LwN6GNOaDYqHWpdU/jqV1gE= -github.com/michaelklishin/rabbit-hole/v2 v2.13.0/go.mod h1:JGRQOHJaoefurvQP6oX4kLVJvv8hggrjGZ/4lTMpzpg= +github.com/michaelklishin/rabbit-hole/v2 v2.15.0 h1:asuENwbu5UsgPBHKgOzHY6VVrjNePurjJoE+8+EWeLA= +github.com/michaelklishin/rabbit-hole/v2 v2.15.0/go.mod h1:o0k0caEjRjboLEylRXVR7aOkuI2vZ6gLXZ78JyonVkA= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw= github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= @@ -357,6 +357,9 @@ github.com/onsi/ginkgo/v2 v2.4.0/go.mod h1:iHkDK1fKGcBoEHT5W7YBq4RFWaQulw+caOMkA github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw= github.com/onsi/ginkgo/v2 v2.7.0/go.mod h1:yjiuMwPokqY1XauOgju45q3sJt6VzQ/Fict1LFVcsAo= github.com/onsi/ginkgo/v2 v2.8.1/go.mod h1:N1/NbDngAFcSLdyZ+/aYTYGSlq9qMCS/cNKGJjy+csc= +github.com/onsi/ginkgo/v2 v2.9.0/go.mod h1:4xkjoL/tZv4SMWeww56BU5kAt19mVB47gTWxmrTcxyk= +github.com/onsi/ginkgo/v2 v2.9.1/go.mod h1:FEcmzVcCHl+4o9bQZVab+4dC9+j+91t2FHSzmGAPfuo= +github.com/onsi/ginkgo/v2 v2.9.2/go.mod h1:WHcJJG2dIlcCqVfBAwUCrJxSPFb6v4azBwgxeMeDuts= github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q= github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= @@ -370,6 +373,9 @@ github.com/onsi/gomega v1.24.0/go.mod h1:Z/NWtiqwBrwUt4/2loMmHL63EDLnYHmVbuBpDr2 github.com/onsi/gomega v1.24.1/go.mod h1:3AOiACssS3/MajrniINInwbfOOtfZvplPzuRSmvt1jM= github.com/onsi/gomega v1.26.0/go.mod h1:r+zV744Re+DiYCIPRlYOTxn0YkOLcAnW8k1xXdMPGhM= github.com/onsi/gomega v1.27.1/go.mod h1:aHX5xOykVYzWOV4WqQy0sy8BQptgukenXpCXfadcIAw= +github.com/onsi/gomega v1.27.3/go.mod h1:5vG284IBtfDAmDyrK+eGyZmUgUlmi+Wngqo557cZ6Gw= +github.com/onsi/gomega v1.27.4/go.mod h1:riYq/GJKh8hhoM01HN6Vmuy93AarCXCBGpvFDK3q3fQ= +github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= github.com/onsi/gomega v1.27.7 h1:fVih9JD6ogIiHUN6ePK7HJidyEDpWGVB5mzM7cWNXoU= github.com/onsi/gomega v1.27.7/go.mod h1:1p8OOlwo2iUUDsHnOrjE5UKYJ+e3W8eQ3qSlRahPmr4= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= @@ -410,8 +416,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= -github.com/rabbitmq/amqp091-go v1.7.0 h1:V5CF5qPem5OGSnEo8BoSbsDGwejg6VUJsKEdneaoTUo= -github.com/rabbitmq/amqp091-go v1.7.0/go.mod h1:wfClAtY0C7bOHxd3GjmF26jEHn+rR/0B3+YV+Vn9/NI= +github.com/rabbitmq/amqp091-go v1.8.1 h1:RejT1SBUim5doqcL6s7iN6SBmsQqyTgXb1xMlH0h1hA= +github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rabbitmq/cluster-operator v1.14.0 h1:1/nMyd9v/8T5IHA1BVcWbV0nrzN31F+gLP+0Ges6Y5E= github.com/rabbitmq/cluster-operator v1.14.0/go.mod h1:7XVU6ngbVJSPDXld+uMVk6nu68GH7fM6yYYY2MdYKek= github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= @@ -468,7 +474,6 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zmap/rc2 v0.0.0-20131011165748-24b9757f5521/go.mod h1:3YZ9o3WnatTIZhuOtot4IcUfzoKVjUHqu6WALIyI0nE= @@ -490,8 +495,8 @@ go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= -go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= @@ -540,12 +545,12 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.6.0/go.mod h1:4mET923SAdbXp2ki8ey+zGs1SLqsuM2Y0uvdZR/fUNI= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.10.0 h1:lFO9qtOdlre5W1jxS3r/4szv2/6iXxScdzjoBMXNhYk= golang.org/x/mod v0.10.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -596,6 +601,7 @@ golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -686,6 +692,7 @@ golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= @@ -696,6 +703,7 @@ golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= golang.org/x/term v0.3.0/go.mod h1:q750SLmJuPmVoN1blW3UFBPREJfb1KmY3vwxfr+nFDA= golang.org/x/term v0.4.0/go.mod h1:9P2UbLfCdcvo3p/nzKvsmas4TnlujnuoV9hGgYzW1lQ= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -711,6 +719,7 @@ golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -763,12 +772,12 @@ golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.2.0/go.mod h1:y4OqIKeOV/fWJetJ8bXPU1sEVniLMIyDAZWeHdV+NTA= golang.org/x/tools v0.4.0/go.mod h1:UE5sM2OK9E/d67R0ANs2xJizIymRP5gJU295PvKXxjQ= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s= golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo= golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/shovel_definition.go b/internal/shovel_definition.go index c29a87e0..d24da27b 100644 --- a/internal/shovel_definition.go +++ b/internal/shovel_definition.go @@ -1,13 +1,46 @@ package internal import ( + "encoding/json" + "fmt" rabbithole "github.com/michaelklishin/rabbit-hole/v2" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" "strings" ) -func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) rabbithole.ShovelDefinition { - return rabbithole.ShovelDefinition{ +func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabbithole.ShovelDefinition, error) { + srcConArgs := make(map[string]interface{}) + if s.Spec.SourceConsumerArgs != nil { + if err := json.Unmarshal(s.Spec.SourceConsumerArgs.Raw, &srcConArgs); err != nil { + return nil, fmt.Errorf("failed to unmarshall source consumer args: %v", err) + } + } + appProperties := make(map[string]interface{}) + if s.Spec.DestinationApplicationProperties != nil { + if err := json.Unmarshal(s.Spec.DestinationApplicationProperties.Raw, &appProperties); err != nil { + return nil, fmt.Errorf("failed to unmarshall destination application properties: %v", err) + } + } + destProperties := make(map[string]interface{}) + if s.Spec.DestinationProperties != nil { + if err := json.Unmarshal(s.Spec.DestinationProperties.Raw, &destProperties); err != nil { + return nil, fmt.Errorf("failed to unmarshall destination properties: %v", err) + } + } + destPubProperties := make(map[string]interface{}) + if s.Spec.DestinationPublishProperties != nil { + if err := json.Unmarshal(s.Spec.DestinationPublishProperties.Raw, &destPubProperties); err != nil { + return nil, fmt.Errorf("failed to unmarshall destination publish properties: %v", err) + } + } + destMsgAnnotations := make(map[string]interface{}) + if s.Spec.DestinationMessageAnnotations != nil { + if err := json.Unmarshal(s.Spec.DestinationMessageAnnotations.Raw, &destMsgAnnotations); err != nil { + return nil, fmt.Errorf("failed to unmarshall destination message annotations: %v", err) + } + } + + return &rabbithole.ShovelDefinition{ SourceURI: strings.Split(srcUri, ","), DestinationURI: strings.Split(destUri, ","), AckMode: s.Spec.AckMode, @@ -16,13 +49,14 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) rabbit DestinationAddForwardHeaders: s.Spec.DestinationAddForwardHeaders, DestinationAddTimestampHeader: s.Spec.DestinationAddTimestampHeader, DestinationAddress: s.Spec.DestinationAddress, - DestinationApplicationProperties: s.Spec.DestinationApplicationProperties, + DestinationApplicationProperties: appProperties, DestinationExchange: s.Spec.DestinationExchange, DestinationExchangeKey: s.Spec.DestinationExchangeKey, - DestinationProperties: s.Spec.DestinationProperties, + DestinationProperties: destProperties, DestinationProtocol: s.Spec.DestinationProtocol, - DestinationPublishProperties: s.Spec.DestinationPublishProperties, + DestinationPublishProperties: destPubProperties, DestinationQueue: s.Spec.DestinationQueue, + DestinationMessageAnnotations: destMsgAnnotations, PrefetchCount: s.Spec.PrefetchCount, ReconnectDelay: s.Spec.ReconnectDelay, SourceAddress: s.Spec.SourceAddress, @@ -32,6 +66,6 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) rabbit SourcePrefetchCount: s.Spec.SourcePrefetchCount, SourceProtocol: s.Spec.SourceProtocol, SourceQueue: s.Spec.SourceQueue, - } - + SourceConsumerArgs: srcConArgs, + }, nil } diff --git a/internal/shovel_definition_test.go b/internal/shovel_definition_test.go index caaef600..eec4b268 100644 --- a/internal/shovel_definition_test.go +++ b/internal/shovel_definition_test.go @@ -5,6 +5,7 @@ import ( . "github.com/onsi/gomega" topology "github.com/rabbitmq/messaging-topology-operator/api/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" ) var _ = Describe("GenerateShovelDefinition", func() { @@ -23,146 +24,185 @@ var _ = Describe("GenerateShovelDefinition", func() { }) It("sets source and destination uris correctly for a single uri", func() { - definition := GenerateShovelDefinition(shovel, "a-rabbitmq-src@test.com", "a-rabbitmq-dest@test.com") + definition, err := GenerateShovelDefinition(shovel, "a-rabbitmq-src@test.com", "a-rabbitmq-dest@test.com") + Expect(err).NotTo(HaveOccurred()) Expect(definition.SourceURI).To(ConsistOf("a-rabbitmq-src@test.com")) Expect(definition.DestinationURI).To(ConsistOf("a-rabbitmq-dest@test.com")) }) It("sets source and destination uris correctly for multiple uris", func() { - definition := GenerateShovelDefinition(shovel, "a-rabbitmq-src@test.com0,a-rabbitmq-src@test1.com", "a-rabbitmq-dest@test0.com,a-rabbitmq-dest@test1.com") + definition, err := GenerateShovelDefinition(shovel, "a-rabbitmq-src@test.com0,a-rabbitmq-src@test1.com", "a-rabbitmq-dest@test0.com,a-rabbitmq-dest@test1.com") + Expect(err).NotTo(HaveOccurred()) Expect(definition.SourceURI).To(ConsistOf("a-rabbitmq-src@test.com0", "a-rabbitmq-src@test1.com")) Expect(definition.DestinationURI).To(ConsistOf("a-rabbitmq-dest@test0.com", "a-rabbitmq-dest@test1.com")) }) It("sets 'AckMode' correctly", func() { shovel.Spec.AckMode = "on-publish" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.AckMode).To(Equal("on-publish")) }) It("sets 'AddForwardHeaders' correctly", func() { shovel.Spec.AddForwardHeaders = true - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.AddForwardHeaders).To(BeTrue()) }) It("sets 'DeleteAfter' correctly", func() { shovel.Spec.DeleteAfter = "never" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(string(definition.DeleteAfter)).To(Equal("never")) }) It("sets 'DestinationAddForwardHeaders' correctly", func() { shovel.Spec.DestinationAddForwardHeaders = true - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.DestinationAddForwardHeaders).To(BeTrue()) }) It("sets 'DestinationAddTimestampHeader' correctly", func() { shovel.Spec.DestinationAddTimestampHeader = true - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.DestinationAddTimestampHeader).To(BeTrue()) }) It("sets 'DestinationAddress' correctly", func() { shovel.Spec.DestinationAddress = "an-address" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.DestinationAddress).To(Equal("an-address")) }) It("sets 'DestinationApplicationProperties' correctly", func() { - shovel.Spec.DestinationApplicationProperties = "a-property" - definition := GenerateShovelDefinition(shovel, "", "") - Expect(definition.DestinationApplicationProperties).To(Equal("a-property")) + shovel.Spec.DestinationApplicationProperties = &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)} + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) + Expect(definition.DestinationApplicationProperties).To(HaveKeyWithValue("key", "a-property")) }) It("sets 'DestinationExchange' correctly", func() { shovel.Spec.DestinationExchange = "an-exchange" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.DestinationExchange).To(Equal("an-exchange")) }) It("sets 'DestinationExchangeKey' correctly", func() { shovel.Spec.DestinationExchangeKey = "a-key" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.DestinationExchangeKey).To(Equal("a-key")) }) It("sets 'DestinationProperties' correctly", func() { - shovel.Spec.DestinationProperties = "a-property" - definition := GenerateShovelDefinition(shovel, "", "") - Expect(definition.DestinationProperties).To(Equal("a-property")) + shovel.Spec.DestinationProperties = &runtime.RawExtension{Raw: []byte(`{"dest-property": "a-value"}`)} + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) + Expect(definition.DestinationProperties).To(HaveKeyWithValue("dest-property", "a-value")) }) It("sets 'DestinationProtocol' correctly", func() { shovel.Spec.DestinationProtocol = "amqp10" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.DestinationProtocol).To(Equal("amqp10")) }) It("sets 'DestinationPublishProperties' correctly", func() { - shovel.Spec.DestinationPublishProperties = "a-publish-property" - definition := GenerateShovelDefinition(shovel, "", "") - Expect(definition.DestinationPublishProperties).To(Equal("a-publish-property")) + shovel.Spec.DestinationPublishProperties = &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 1}`)} + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) + // Unmarshall stores float64 for JSON numbers + Expect(definition.DestinationPublishProperties).To(HaveKeyWithValue("delivery_mode", float64(1))) + }) + + It("sets 'DestinationMessageAnnotations' correctly", func() { + shovel.Spec.DestinationMessageAnnotations = &runtime.RawExtension{Raw: []byte(`{"key": "an-annotation-value"}`)} + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) + Expect(definition.DestinationMessageAnnotations).To(HaveKeyWithValue("key", "an-annotation-value")) }) It("sets 'DestinationQueue' correctly", func() { shovel.Spec.DestinationQueue = "a-destination-queue" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.DestinationQueue).To(Equal("a-destination-queue")) }) It("sets 'PrefetchCount' correctly", func() { shovel.Spec.PrefetchCount = 200 - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.PrefetchCount).To(Equal(200)) }) It("sets 'ReconnectDelay' correctly", func() { shovel.Spec.ReconnectDelay = 2000 - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.ReconnectDelay).To(Equal(2000)) }) It("sets 'SourceAddress' correctly", func() { shovel.Spec.SourceAddress = "an-address" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.SourceAddress).To(Equal("an-address")) }) It("sets 'SourceDeleteAfter' correctly", func() { shovel.Spec.SourceDeleteAfter = "10000000" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(string(definition.SourceDeleteAfter)).To(Equal("10000000")) }) It("sets 'SourceExchange' correctly", func() { shovel.Spec.SourceExchange = "an-source-exchange" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.SourceExchange).To(Equal("an-source-exchange")) }) It("sets 'SourceExchangeKey' correctly", func() { shovel.Spec.SourceExchangeKey = "an-source-exchange-key" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.SourceExchangeKey).To(Equal("an-source-exchange-key")) }) It("sets 'SourcePrefetchCount' correctly", func() { shovel.Spec.SourcePrefetchCount = 200 - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.SourcePrefetchCount).To(Equal(200)) }) It("sets 'SourceProtocol' correctly", func() { shovel.Spec.SourceProtocol = "amqp09" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.SourceProtocol).To(Equal("amqp09")) }) It("sets 'SourceQueue' correctly", func() { shovel.Spec.SourceQueue = "a-great-queue" - definition := GenerateShovelDefinition(shovel, "", "") + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) Expect(definition.SourceQueue).To(Equal("a-great-queue")) }) + + It("sets 'SourceConsumerArgs' correctly", func() { + shovel.Spec.SourceConsumerArgs = &runtime.RawExtension{Raw: []byte(`{"x-cancel-on-ha-failover": true}`)} + definition, err := GenerateShovelDefinition(shovel, "", "") + Expect(err).NotTo(HaveOccurred()) + Expect(definition.SourceConsumerArgs).To(HaveKeyWithValue("x-cancel-on-ha-failover", true)) + }) }) diff --git a/system_tests/shovel_system_test.go b/system_tests/shovel_system_test.go index c5c01ff5..ece77d9e 100644 --- a/system_tests/shovel_system_test.go +++ b/system_tests/shovel_system_test.go @@ -5,6 +5,7 @@ import ( rabbithole "github.com/michaelklishin/rabbit-hole/v2" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -39,16 +40,12 @@ var _ = Describe("Shovel", func() { shovel = &topology.Shovel{ ObjectMeta: metav1.ObjectMeta{ - Name: "shovel", Namespace: namespace, }, Spec: topology.ShovelSpec{ - Name: "my-upstream", - UriSecret: &corev1.LocalObjectReference{Name: shovelSecret.Name}, - DeleteAfter: "never", - SourceQueue: "a-queue", - DestinationQueue: "another-queue", - AckMode: "no-ack", + UriSecret: &corev1.LocalObjectReference{Name: shovelSecret.Name}, + SourceDeleteAfter: "never", + AckMode: "no-ack", RabbitmqClusterReference: topology.RabbitmqClusterReference{ Name: rmq.Name, }, @@ -61,27 +58,20 @@ var _ = Describe("Shovel", func() { }) It("works", func() { + shovel.Name = "shovel" + shovel.Spec.Name = "my-upstream" + shovel.Spec.SourceQueue = "a-queue" + shovel.Spec.SourceConsumerArgs = &runtime.RawExtension{Raw: []byte(`{"x-priority": 5}`)} + shovel.Spec.DestinationQueue = "another-queue" + shovel.Spec.DestinationPublishProperties = &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 2}`)} + By("declaring shovel successfully") - Expect(k8sClient.Create(ctx, shovel, &client.CreateOptions{})).To(Succeed()) - var shovelInfo *rabbithole.ShovelInfo - Eventually(func() error { - var err error - shovelInfo, err = rabbitClient.GetShovel("/", shovel.Spec.Name) - return err - }, 30, 2).Should(BeNil()) - - Expect(shovelInfo.Name).To(Equal(shovel.Spec.Name)) - Expect(shovelInfo.Vhost).To(Equal(shovel.Spec.Vhost)) - Expect(shovelInfo.Definition.SourceURI).To( - ConsistOf("amqp://server-test-src0", - "amqp://server-test-src1")) - Expect(shovelInfo.Definition.DestinationURI).To( - ConsistOf("amqp://server-test-dest0", - "amqp://server-test-dest1")) + shovelInfo := declareAssertShovelCommonProperties(ctx, shovel) + Expect(shovelInfo.Definition.DestinationQueue).To(Equal(shovel.Spec.DestinationQueue)) Expect(shovelInfo.Definition.SourceQueue).To(Equal(shovel.Spec.SourceQueue)) - Expect(shovelInfo.Definition.AckMode).To(Equal(shovel.Spec.AckMode)) - Expect(string(shovelInfo.Definition.DeleteAfter)).To(Equal(shovel.Spec.DeleteAfter)) + Expect(shovelInfo.Definition.SourceConsumerArgs).To(HaveKeyWithValue("x-priority", float64(5))) + Expect(shovelInfo.Definition.DestinationPublishProperties).To(HaveKeyWithValue("delivery_mode", float64(2))) By("updating status condition 'Ready'") updatedShovel := topology.Shovel{} @@ -109,7 +99,7 @@ var _ = Describe("Shovel", func() { updateTest.Spec.Name = "a-new-shovel" Expect(k8sClient.Update(ctx, &updateTest).Error()).To(ContainSubstring("spec.name: Forbidden: updates on name, vhost and rabbitmqClusterReference are all forbidden")) - By("updating shovel upstream parameters successfully") + By("updating shovel parameters successfully") Expect(k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, shovel)).To(Succeed()) shovel.Spec.PrefetchCount = 200 Expect(k8sClient.Update(ctx, shovel, &client.UpdateOptions{})).To(Succeed()) @@ -122,11 +112,68 @@ var _ = Describe("Shovel", func() { By("deleting shovel configuration on deletion") Expect(k8sClient.Delete(ctx, shovel)).To(Succeed()) - var err error - Eventually(func() error { - _, err = rabbitClient.GetShovel("/", shovel.Spec.Name) - return err - }, 10).Should(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("Object Not Found")) + assertShovelDeleted(shovel) + }) + + It("works with a shovel using amqp10 protocol", func() { + shovel.Name = "shovel-amqp10" + shovel.Spec.Name = "my-upstream-amqp10" + shovel.Spec.SourceProtocol = "amqp10" + shovel.Spec.DestinationProtocol = "amqp10" + shovel.Spec.SourceAddress = "/an-exchange" + shovel.Spec.DestinationAddress = "/a-queue" + shovel.Spec.DestinationApplicationProperties = &runtime.RawExtension{Raw: []byte(`{"a-key": "a-value"}`)} + shovel.Spec.DestinationMessageAnnotations = &runtime.RawExtension{Raw: []byte(`{"a-key": "a-annotation"}`)} + shovel.Spec.DestinationProperties = &runtime.RawExtension{Raw: []byte(`{"content_type": "text/plain"}`)} + shovel.Spec.DestinationAddForwardHeaders = true + shovel.Spec.DestinationAddTimestampHeader = true + + By("declaring shovel successfully") + shovelInfo := declareAssertShovelCommonProperties(ctx, shovel) + + Expect(shovelInfo.Definition.SourceProtocol).To(Equal("amqp10")) + Expect(shovelInfo.Definition.DestinationProtocol).To(Equal("amqp10")) + Expect(shovelInfo.Definition.SourceAddress).To(Equal("/an-exchange")) + Expect(shovelInfo.Definition.DestinationAddress).To(Equal("/a-queue")) + Expect(shovelInfo.Definition.DestinationApplicationProperties).To(HaveKeyWithValue("a-key", "a-value")) + Expect(shovelInfo.Definition.DestinationMessageAnnotations).To(HaveKeyWithValue("a-key", "a-annotation")) + Expect(shovelInfo.Definition.DestinationProperties).To(HaveKeyWithValue("content_type", "text/plain")) + Expect(shovelInfo.Definition.DestinationAddForwardHeaders).To(BeTrue()) + Expect(shovelInfo.Definition.DestinationAddTimestampHeader).To(BeTrue()) + + By("deleting shovel configuration on deletion") + Expect(k8sClient.Delete(ctx, shovel)).To(Succeed()) + assertShovelDeleted(shovel) }) }) + +func declareAssertShovelCommonProperties(ctx context.Context, shovel *topology.Shovel) *rabbithole.ShovelInfo { + Expect(k8sClient.Create(ctx, shovel, &client.CreateOptions{})).To(Succeed()) + var shovelInfo *rabbithole.ShovelInfo + Eventually(func() error { + var err error + shovelInfo, err = rabbitClient.GetShovel("/", shovel.Spec.Name) + return err + }, 30, 2).Should(BeNil()) + + Expect(shovelInfo.Name).To(Equal(shovel.Spec.Name)) + Expect(shovelInfo.Vhost).To(Equal(shovel.Spec.Vhost)) + Expect(shovelInfo.Definition.SourceURI).To( + ConsistOf("amqp://server-test-src0", + "amqp://server-test-src1")) + Expect(shovelInfo.Definition.DestinationURI).To( + ConsistOf("amqp://server-test-dest0", + "amqp://server-test-dest1")) + Expect(shovelInfo.Definition.AckMode).To(Equal(shovel.Spec.AckMode)) + Expect(string(shovelInfo.Definition.SourceDeleteAfter)).To(Equal(shovel.Spec.SourceDeleteAfter)) + return shovelInfo +} + +func assertShovelDeleted(shovel *topology.Shovel) { + var err error + Eventually(func() error { + _, err = rabbitClient.GetShovel("/", shovel.Spec.Name) + return err + }, 10).Should(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("Object Not Found")) +}