Skip to content

Commit

Permalink
Expose two new properties in shovel CRD
Browse files Browse the repository at this point in the history
- expose `dest-message-annotations` and
`src-consumer-args` in Shovel as maps
  • Loading branch information
ChunyiLyu committed May 23, 2023
1 parent 5282163 commit 273263c
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 17 deletions.
26 changes: 16 additions & 10 deletions api/v1beta1/shovel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,22 @@ type ShovelSpec struct {
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
DestinationPublishProperties *runtime.RawExtension `json:"destPublishProperties,omitempty"`
DestinationQueue string `json:"destQueue,omitempty"`
PrefetchCount int `json:"prefetchCount,omitempty"`
ReconnectDelay int `json:"reconnectDelay,omitempty"`
SourceAddress string `json:"srcAddress,omitempty"`
SourceDeleteAfter string `json:"srcDeleteAfter,omitempty"`
SourceExchange string `json:"srcExchange,omitempty"`
SourceExchangeKey string `json:"srcExchangeKey,omitempty"`
SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"`
SourceProtocol string `json:"srcProtocol,omitempty"`
SourceQueue string `json:"srcQueue,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
DestinationMessageAnnotations *runtime.RawExtension `json:"destMessageAnnotations,omitempty"`
DestinationQueue string `json:"destQueue,omitempty"`
PrefetchCount int `json:"prefetchCount,omitempty"`
ReconnectDelay int `json:"reconnectDelay,omitempty"`
SourceAddress string `json:"srcAddress,omitempty"`
SourceDeleteAfter string `json:"srcDeleteAfter,omitempty"`
SourceExchange string `json:"srcExchange,omitempty"`
SourceExchangeKey string `json:"srcExchangeKey,omitempty"`
SourcePrefetchCount int `json:"srcPrefetchCount,omitempty"`
SourceProtocol string `json:"srcProtocol,omitempty"`
SourceQueue string `json:"srcQueue,omitempty"`
// +kubebuilder:validation:Type=object
// +kubebuilder:pruning:PreserveUnknownFields
SourceConsumerArgs *runtime.RawExtension `json:"srcConsumerArgs,omitempty"`
}

// ShovelStatus defines the observed state of Shovel
Expand Down
14 changes: 14 additions & 0 deletions api/v1beta1/shovel_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ var _ = Describe("shovel webhook", func() {
DestinationProperties: &runtime.RawExtension{Raw: []byte(`{"key": "a-property"}`)},
DestinationProtocol: "amqp091",
DestinationPublishProperties: &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 1}`)},
DestinationMessageAnnotations: &runtime.RawExtension{Raw: []byte(`{"a-key": "an-annotation"}`)},
DestinationQueue: "a-queue",
PrefetchCount: 10,
ReconnectDelay: 10,
Expand All @@ -42,6 +43,7 @@ var _ = Describe("shovel webhook", func() {
SourcePrefetchCount: 10,
SourceProtocol: "amqp091",
SourceQueue: "a-queue",
SourceConsumerArgs: &runtime.RawExtension{Raw: []byte(`{"x-priority": 1}`)},
RabbitmqClusterReference: RabbitmqClusterReference{
Name: "a-cluster",
},
Expand Down Expand Up @@ -190,6 +192,12 @@ var _ = Describe("shovel webhook", func() {
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

It("allows updates on DestinationMessageAnnotations", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationMessageAnnotations = &runtime.RawExtension{Raw: []byte(`{"key": "new-annotation"}`)}
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

It("allows updates on DestinationQueue", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.DestinationQueue = "another-queue"
Expand Down Expand Up @@ -249,5 +257,11 @@ var _ = Describe("shovel webhook", func() {
newShovel.Spec.SourceQueue = "another-queue"
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})

It("allows updates on SourceConsumerArgs", func() {
newShovel := shovel.DeepCopy()
newShovel.Spec.SourceConsumerArgs = &runtime.RawExtension{Raw: []byte(`{"x-priority": 10}`)}
Expect(newShovel.ValidateUpdate(&shovel)).To(Succeed())
})
})
})
10 changes: 10 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions config/crd/bases/rabbitmq.com_shovels.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ spec:
type: string
destExchangeKey:
type: string
destMessageAnnotations:
type: object
x-kubernetes-preserve-unknown-fields: true
destProperties:
type: object
x-kubernetes-preserve-unknown-fields: true
Expand Down Expand Up @@ -105,6 +108,9 @@ spec:
type: integer
srcAddress:
type: string
srcConsumerArgs:
type: object
x-kubernetes-preserve-unknown-fields: true
srcDeleteAfter:
type: string
srcExchange:
Expand Down
2 changes: 2 additions & 0 deletions docs/api/rabbitmq.com.ref.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,7 @@ ShovelSpec defines the desired state of Shovel For how to configure Shovel, see:
| *`destApplicationProperties`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ |
| *`destProperties`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ |
| *`destPublishProperties`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ |
| *`destMessageAnnotations`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ |
| *`destQueue`* __string__ |
| *`prefetchCount`* __integer__ |
| *`reconnectDelay`* __integer__ |
Expand All @@ -908,6 +909,7 @@ ShovelSpec defines the desired state of Shovel For how to configure Shovel, see:
| *`srcPrefetchCount`* __integer__ |
| *`srcProtocol`* __string__ |
| *`srcQueue`* __string__ |
| *`srcConsumerArgs`* __xref:{anchor_prefix}-k8s-io-apimachinery-pkg-runtime-rawextension[$$RawExtension$$]__ |
|===


Expand Down
16 changes: 15 additions & 1 deletion internal/shovel_definition.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
)

func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabbithole.ShovelDefinition, error) {
srcConArgs := make(map[string]interface{})
if s.Spec.SourceConsumerArgs != nil {
if err := json.Unmarshal(s.Spec.SourceConsumerArgs.Raw, &srcConArgs); err != nil {
return nil, fmt.Errorf("failed to unmarshall: %v", err)
}
}
appProperties := make(map[string]interface{})
if s.Spec.DestinationApplicationProperties != nil {
if err := json.Unmarshal(s.Spec.DestinationApplicationProperties.Raw, &appProperties); err != nil {
Expand All @@ -21,13 +27,19 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabb
return nil, fmt.Errorf("failed to unmarshall: %v", err)
}
}

destPubProperties := make(map[string]interface{})
if s.Spec.DestinationPublishProperties != nil {
if err := json.Unmarshal(s.Spec.DestinationPublishProperties.Raw, &destPubProperties); err != nil {
return nil, fmt.Errorf("failed to unmarshall: %v", err)
}
}
destMsgAnnotations := make(map[string]interface{})
if s.Spec.DestinationMessageAnnotations != nil {
if err := json.Unmarshal(s.Spec.DestinationMessageAnnotations.Raw, &destMsgAnnotations); err != nil {
return nil, fmt.Errorf("failed to unmarshall: %v", err)
}
}

return &rabbithole.ShovelDefinition{
SourceURI: strings.Split(srcUri, ","),
DestinationURI: strings.Split(destUri, ","),
Expand All @@ -44,6 +56,7 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabb
DestinationProtocol: s.Spec.DestinationProtocol,
DestinationPublishProperties: destPubProperties,
DestinationQueue: s.Spec.DestinationQueue,
DestinationMessageAnnotations: destMsgAnnotations,
PrefetchCount: s.Spec.PrefetchCount,
ReconnectDelay: s.Spec.ReconnectDelay,
SourceAddress: s.Spec.SourceAddress,
Expand All @@ -53,5 +66,6 @@ func GenerateShovelDefinition(s *topology.Shovel, srcUri, destUri string) (*rabb
SourcePrefetchCount: s.Spec.SourcePrefetchCount,
SourceProtocol: s.Spec.SourceProtocol,
SourceQueue: s.Spec.SourceQueue,
SourceConsumerArgs: srcConArgs,
}, nil
}
14 changes: 14 additions & 0 deletions internal/shovel_definition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ var _ = Describe("GenerateShovelDefinition", func() {
Expect(definition.DestinationPublishProperties).To(HaveKeyWithValue("delivery_mode", float64(1)))
})

It("sets 'DestinationMessageAnnotations' correctly", func() {
shovel.Spec.DestinationMessageAnnotations = &runtime.RawExtension{Raw: []byte(`{"key": "an-annotation-value"}`)}
definition, err := GenerateShovelDefinition(shovel, "", "")
Expect(err).NotTo(HaveOccurred())
Expect(definition.DestinationMessageAnnotations).To(HaveKeyWithValue("key", "an-annotation-value"))
})

It("sets 'DestinationQueue' correctly", func() {
shovel.Spec.DestinationQueue = "a-destination-queue"
definition, err := GenerateShovelDefinition(shovel, "", "")
Expand Down Expand Up @@ -190,4 +197,11 @@ var _ = Describe("GenerateShovelDefinition", func() {
Expect(err).NotTo(HaveOccurred())
Expect(definition.SourceQueue).To(Equal("a-great-queue"))
})

It("sets 'SourceConsumerArgs' correctly", func() {
shovel.Spec.SourceConsumerArgs = &runtime.RawExtension{Raw: []byte(`{"x-cancel-on-ha-failover": true}`)}
definition, err := GenerateShovelDefinition(shovel, "", "")
Expect(err).NotTo(HaveOccurred())
Expect(definition.SourceConsumerArgs).To(HaveKeyWithValue("x-cancel-on-ha-failover", true))
})
})
25 changes: 19 additions & 6 deletions system_tests/shovel_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
rabbithole "github.com/michaelklishin/rabbit-hole/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -43,12 +44,21 @@ var _ = Describe("Shovel", func() {
Namespace: namespace,
},
Spec: topology.ShovelSpec{
Name: "my-upstream",
UriSecret: &corev1.LocalObjectReference{Name: shovelSecret.Name},
DeleteAfter: "never",
SourceQueue: "a-queue",
Name: "my-upstream",
UriSecret: &corev1.LocalObjectReference{Name: shovelSecret.Name},
DeleteAfter: "never",
SourceQueue: "a-queue",
SourceConsumerArgs: &runtime.RawExtension{
Raw: []byte(`{"x-priority": 5}`),
},
DestinationQueue: "another-queue",
AckMode: "no-ack",
DestinationPublishProperties: &runtime.RawExtension{
Raw: []byte(`{"delivery_mode": "2"}`),
},
DestinationMessageAnnotations: &runtime.RawExtension{
Raw: []byte(`{"key": "value"}`),
},
AckMode: "no-ack",
RabbitmqClusterReference: topology.RabbitmqClusterReference{
Name: rmq.Name,
},
Expand Down Expand Up @@ -80,8 +90,11 @@ var _ = Describe("Shovel", func() {
"amqp://server-test-dest1"))
Expect(shovelInfo.Definition.DestinationQueue).To(Equal(shovel.Spec.DestinationQueue))
Expect(shovelInfo.Definition.SourceQueue).To(Equal(shovel.Spec.SourceQueue))
Expect(shovelInfo.Definition.SourceConsumerArgs).To(HaveKeyWithValue("x-priority", 5))
Expect(shovelInfo.Definition.AckMode).To(Equal(shovel.Spec.AckMode))
Expect(string(shovelInfo.Definition.DeleteAfter)).To(Equal(shovel.Spec.DeleteAfter))
Expect(shovelInfo.Definition.DestinationPublishProperties).To(HaveKeyWithValue("delivery_mode", 2))
Expect(shovelInfo.Definition.DestinationMessageAnnotations).To(HaveKeyWithValue("key", "value"))

By("updating status condition 'Ready'")
updatedShovel := topology.Shovel{}
Expand Down Expand Up @@ -109,7 +122,7 @@ var _ = Describe("Shovel", func() {
updateTest.Spec.Name = "a-new-shovel"
Expect(k8sClient.Update(ctx, &updateTest).Error()).To(ContainSubstring("spec.name: Forbidden: updates on name, vhost and rabbitmqClusterReference are all forbidden"))

By("updating shovel upstream parameters successfully")
By("updating shovel parameters successfully")
Expect(k8sClient.Get(ctx, types.NamespacedName{Name: shovel.Name, Namespace: shovel.Namespace}, shovel)).To(Succeed())
shovel.Spec.PrefetchCount = 200
Expect(k8sClient.Update(ctx, shovel, &client.UpdateOptions{})).To(Succeed())
Expand Down

0 comments on commit 273263c

Please sign in to comment.