Skip to content

Commit

Permalink
Add shovel amqp 1.0 system test
Browse files Browse the repository at this point in the history
- properties for different protocol amqp091 and amqp10
are different in shovel
  • Loading branch information
ChunyiLyu committed May 24, 2023
1 parent 37b9a77 commit a1a5f32
Showing 1 changed file with 73 additions and 35 deletions.
108 changes: 73 additions & 35 deletions system_tests/shovel_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,11 @@ var _ = Describe("Shovel", func() {

shovel = &topology.Shovel{
ObjectMeta: metav1.ObjectMeta{
Name: "shovel",
Namespace: namespace,
},
Spec: topology.ShovelSpec{
Name: "my-upstream",
UriSecret: &corev1.LocalObjectReference{Name: shovelSecret.Name},
DeleteAfter: "never",
SourceQueue: "a-queue",
SourceConsumerArgs: &runtime.RawExtension{
Raw: []byte(`{"x-priority": 5}`),
},
DestinationQueue: "another-queue",
DestinationPublishProperties: &runtime.RawExtension{
Raw: []byte(`{"delivery_mode": 2}`),
},
SourceDeleteAfter: "never",
AckMode: "no-ack",
RabbitmqClusterReference: topology.RabbitmqClusterReference{
Name: rmq.Name,
Expand All @@ -68,28 +58,19 @@ var _ = Describe("Shovel", func() {
})

It("works", func() {
shovel.Name = "shovel"
shovel.Spec.Name = "my-upstream"
shovel.Spec.SourceQueue = "a-queue"
shovel.Spec.SourceConsumerArgs = &runtime.RawExtension{Raw: []byte(`{"x-priority": 5}`)}
shovel.Spec.DestinationQueue = "another-queue"
shovel.Spec.DestinationPublishProperties = &runtime.RawExtension{Raw: []byte(`{"delivery_mode": 2}`)}

By("declaring shovel successfully")
Expect(k8sClient.Create(ctx, shovel, &client.CreateOptions{})).To(Succeed())
var shovelInfo *rabbithole.ShovelInfo
Eventually(func() error {
var err error
shovelInfo, err = rabbitClient.GetShovel("/", shovel.Spec.Name)
return err
}, 30, 2).Should(BeNil())

Expect(shovelInfo.Name).To(Equal(shovel.Spec.Name))
Expect(shovelInfo.Vhost).To(Equal(shovel.Spec.Vhost))
Expect(shovelInfo.Definition.SourceURI).To(
ConsistOf("amqp://server-test-src0",
"amqp://server-test-src1"))
Expect(shovelInfo.Definition.DestinationURI).To(
ConsistOf("amqp://server-test-dest0",
"amqp://server-test-dest1"))
shovelInfo := declareShovelandAssertCommonProperties(ctx, shovel)

Expect(shovelInfo.Definition.DestinationQueue).To(Equal(shovel.Spec.DestinationQueue))
Expect(shovelInfo.Definition.SourceQueue).To(Equal(shovel.Spec.SourceQueue))
Expect(shovelInfo.Definition.SourceConsumerArgs).To(HaveKeyWithValue("x-priority", float64(5)))
Expect(shovelInfo.Definition.AckMode).To(Equal(shovel.Spec.AckMode))
Expect(string(shovelInfo.Definition.DeleteAfter)).To(Equal(shovel.Spec.DeleteAfter))
Expect(shovelInfo.Definition.DestinationPublishProperties).To(HaveKeyWithValue("delivery_mode", float64(2)))

By("updating status condition 'Ready'")
Expand Down Expand Up @@ -131,11 +112,68 @@ var _ = Describe("Shovel", func() {

By("deleting shovel configuration on deletion")
Expect(k8sClient.Delete(ctx, shovel)).To(Succeed())
var err error
Eventually(func() error {
_, err = rabbitClient.GetShovel("/", shovel.Spec.Name)
return err
}, 10).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("Object Not Found"))
assertShovelConfigDeleted(shovel)
})

It("works with a shovel using amqp10 protocol", func() {
shovel.Name = "shovel-amqp10"
shovel.Spec.Name = "my-upstream-amqp10"
shovel.Spec.SourceProtocol = "amqp10"
shovel.Spec.DestinationProtocol = "amqp10"
shovel.Spec.SourceAddress = "/an-exchange"
shovel.Spec.DestinationAddress = "/a-queue"
shovel.Spec.DestinationApplicationProperties = &runtime.RawExtension{Raw: []byte(`{"a-key": "a-value"}`)}
shovel.Spec.DestinationMessageAnnotations = &runtime.RawExtension{Raw: []byte(`{"a-key": "a-annotation"}`)}
shovel.Spec.DestinationProperties = &runtime.RawExtension{Raw: []byte(`{"content_type": "text/plain"}`)}
shovel.Spec.DestinationAddForwardHeaders = true
shovel.Spec.DestinationAddTimestampHeader = true

By("declaring shovel successfully")
shovelInfo := declareShovelandAssertCommonProperties(ctx, shovel)

Expect(shovelInfo.Definition.SourceProtocol).To(Equal("amqp10"))
Expect(shovelInfo.Definition.DestinationProtocol).To(Equal("amqp10"))
Expect(shovelInfo.Definition.SourceAddress).To(Equal("/an-exchange"))
Expect(shovelInfo.Definition.DestinationAddress).To(Equal("/a-queue"))
Expect(shovelInfo.Definition.DestinationApplicationProperties).To(HaveKeyWithValue("a-key", "a-value"))
Expect(shovelInfo.Definition.DestinationMessageAnnotations).To(HaveKeyWithValue("a-key", "a-annotation"))
Expect(shovelInfo.Definition.DestinationProperties).To(HaveKeyWithValue("content_type", "text/plain"))
Expect(shovelInfo.Definition.DestinationAddForwardHeaders).To(BeTrue())
Expect(shovelInfo.Definition.DestinationAddTimestampHeader).To(BeTrue())

By("deleting shovel configuration on deletion")
Expect(k8sClient.Delete(ctx, shovel)).To(Succeed())
assertShovelConfigDeleted(shovel)
})
})

func declareShovelandAssertCommonProperties(ctx context.Context, shovel *topology.Shovel) *rabbithole.ShovelInfo {
Expect(k8sClient.Create(ctx, shovel, &client.CreateOptions{})).To(Succeed())
var shovelInfo *rabbithole.ShovelInfo
Eventually(func() error {
var err error
shovelInfo, err = rabbitClient.GetShovel("/", shovel.Spec.Name)
return err
}, 30, 2).Should(BeNil())

Expect(shovelInfo.Name).To(Equal(shovel.Spec.Name))
Expect(shovelInfo.Vhost).To(Equal(shovel.Spec.Vhost))
Expect(shovelInfo.Definition.SourceURI).To(
ConsistOf("amqp://server-test-src0",
"amqp://server-test-src1"))
Expect(shovelInfo.Definition.DestinationURI).To(
ConsistOf("amqp://server-test-dest0",
"amqp://server-test-dest1"))
Expect(shovelInfo.Definition.AckMode).To(Equal(shovel.Spec.AckMode))
Expect(string(shovelInfo.Definition.SourceDeleteAfter)).To(Equal(shovel.Spec.SourceDeleteAfter))
return shovelInfo
}

func assertShovelConfigDeleted(shovel *topology.Shovel) {
var err error
Eventually(func() error {
_, err = rabbitClient.GetShovel("/", shovel.Spec.Name)
return err
}, 10).Should(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("Object Not Found"))
}

0 comments on commit a1a5f32

Please sign in to comment.