From c3b509a622694d1b75c034963460829690465c0c Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Wed, 17 May 2023 15:28:25 +0100 Subject: [PATCH] Add missing shovel parameters - dest-queue-args, src-queue-args, src-consumer-args, and dest-message-annotations - other "missing" parameters 'add-timestamp-header' and 'publish-properties' are not added because their equivalent 'dest-add-timestamp-header' and 'dest-publish-properties' are already present --- rabbithole_test.go | 64 +++++++++++++++++++++++++++++++--------------- shovels.go | 4 +++ 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/rabbithole_test.go b/rabbithole_test.go index 0c85b3d..1f4b566 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -3479,17 +3479,21 @@ var _ = Describe("RabbitMQ HTTP API client", func() { sdu := URISet([]string{"amqp://127.0.0.1/%2f", "amqp://localhost/%2f"}) shovelDefinition := ShovelDefinition{ - SourceURI: ssu, - SourceAddress: "mySourceQueue", - SourceProtocol: "amqp10", - DestinationURI: sdu, - DestinationProtocol: "amqp10", - DestinationAddress: "myDestQueue", - DestinationAddForwardHeaders: true, - DestinationAddTimestampHeader: true, - AckMode: "on-confirm", - SourcePrefetchCount: 42, - SourceDeleteAfter: "never"} + AckMode: "on-confirm", + ReconnectDelay: 20, + SourceURI: ssu, + SourceAddress: "mySourceQueue", + SourceProtocol: "amqp10", + SourcePrefetchCount: 42, + SourceDeleteAfter: "never", + DestinationURI: sdu, + DestinationProtocol: "amqp10", + DestinationAddress: "myDestQueue", + DestinationAddForwardHeaders: true, + DestinationAddTimestampHeader: true, + DestinationApplicationProperties: map[string]interface{}{"key": "value"}, + DestinationMessageAnnotations: map[string]interface{}{"annotation": "something"}, + DestinationProperties: map[string]interface{}{"prop0": "value0"}} _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) Ω(err).Should(BeNil()) @@ -3506,17 +3510,21 @@ var _ = Describe("RabbitMQ HTTP API client", func() { Ω(x.Name).Should(Equal(sn)) Ω(x.Vhost).Should(Equal(vh)) Ω(x.Component).Should(Equal("shovel")) + Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.ReconnectDelay).Should(Equal(20)) Ω(x.Definition.SourceAddress).Should(Equal("mySourceQueue")) Ω(x.Definition.SourceURI).Should(Equal(ssu)) Ω(x.Definition.SourcePrefetchCount).Should(Equal(42)) Ω(x.Definition.SourceProtocol).Should(Equal("amqp10")) + Ω(string(x.Definition.SourceDeleteAfter)).Should(Equal("never")) Ω(x.Definition.DestinationAddress).Should(Equal("myDestQueue")) Ω(x.Definition.DestinationURI).Should(Equal(sdu)) Ω(x.Definition.DestinationProtocol).Should(Equal("amqp10")) Ω(x.Definition.DestinationAddForwardHeaders).Should(Equal(true)) Ω(x.Definition.DestinationAddTimestampHeader).Should(Equal(true)) - Ω(x.Definition.AckMode).Should(Equal("on-confirm")) - Ω(string(x.Definition.SourceDeleteAfter)).Should(Equal("never")) + Ω(x.Definition.DestinationApplicationProperties).Should(HaveKeyWithValue("key", "value")) + Ω(x.Definition.DestinationMessageAnnotations).Should(HaveKeyWithValue("annotation", "something")) + Ω(x.Definition.DestinationProperties).Should(HaveKeyWithValue("prop0", "value0")) _, err = rmqc.DeleteShovel(vh, sn) Ω(err).Should(BeNil()) @@ -3544,13 +3552,20 @@ var _ = Describe("RabbitMQ HTTP API client", func() { sdu := URISet([]string{"amqp://127.0.0.1/%2f"}) shovelDefinition := ShovelDefinition{ - SourceURI: ssu, - SourceQueue: "mySourceQueue", - DestinationURI: sdu, - DestinationQueue: "myDestQueue", - AddForwardHeaders: true, - AckMode: "on-confirm", - DeleteAfter: "never"} + AckMode: "on-confirm", + ReconnectDelay: 20, + SourceURI: ssu, + SourceQueue: "mySourceQueue", + SourceQueueArgs: map[string]interface{}{"x-message-ttl": 12000}, + SourceConsumerArgs: map[string]interface{}{"x-priority": 2}, + SourcePrefetchCount: 5, + DestinationURI: sdu, + DestinationQueue: "myDestQueue", + DestinationQueueArgs: map[string]interface{}{"x-expires": 222000}, + AddForwardHeaders: true, + DestinationAddTimestampHeader: true, + DestinationPublishProperties: map[string]interface{}{"delivery_mode": 1}, + DeleteAfter: "never"} _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) Ω(err).Should(BeNil(), "Error declaring shovel") @@ -3567,12 +3582,19 @@ var _ = Describe("RabbitMQ HTTP API client", func() { Ω(x.Name).Should(Equal(sn)) Ω(x.Vhost).Should(Equal(vh)) Ω(x.Component).Should(Equal("shovel")) + Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.ReconnectDelay).Should(Equal(20)) Ω(x.Definition.SourceURI).Should(Equal(ssu)) Ω(x.Definition.SourceQueue).Should(Equal("mySourceQueue")) + Ω(x.Definition.SourceQueueArgs).Should(HaveKeyWithValue("x-message-ttl", float64(12000))) + Ω(x.Definition.SourceConsumerArgs).Should(HaveKeyWithValue("x-priority", float64(2))) + Ω(x.Definition.SourcePrefetchCount).Should(Equal(5)) Ω(x.Definition.DestinationURI).Should(Equal(sdu)) Ω(x.Definition.DestinationQueue).Should(Equal("myDestQueue")) + Ω(x.Definition.DestinationQueueArgs).Should(HaveKeyWithValue("x-expires", float64(222000))) Ω(x.Definition.AddForwardHeaders).Should(Equal(true)) - Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.DestinationAddTimestampHeader).Should(Equal(true)) + Ω(x.Definition.DestinationPublishProperties).Should(HaveKeyWithValue("delivery_mode", float64(1))) Ω(string(x.Definition.DeleteAfter)).Should(Equal("never")) _, err = rmqc.DeleteShovel(vh, sn) diff --git a/shovels.go b/shovels.go index 429b861..69f0871 100644 --- a/shovels.go +++ b/shovels.go @@ -84,6 +84,8 @@ type ShovelDefinition struct { DestinationProtocol string `json:"dest-protocol,omitempty"` DestinationPublishProperties map[string]interface{} `json:"dest-publish-properties,omitempty"` DestinationQueue string `json:"dest-queue,omitempty"` + DestinationQueueArgs map[string]interface{} `json:"dest-queue-args,omitempty"` + DestinationMessageAnnotations map[string]interface{} `json:"dest-message-annotations,omitempty"` PrefetchCount int `json:"prefetch-count,omitempty"` ReconnectDelay int `json:"reconnect-delay,omitempty"` SourceAddress string `json:"src-address,omitempty"` @@ -93,6 +95,8 @@ type ShovelDefinition struct { SourcePrefetchCount int `json:"src-prefetch-count,omitempty"` SourceProtocol string `json:"src-protocol,omitempty"` SourceQueue string `json:"src-queue,omitempty"` + SourceQueueArgs map[string]interface{} `json:"src-queue-args,omitempty"` + SourceConsumerArgs map[string]interface{} `json:"src-consumer-args,omitempty"` } // ShovelDefinitionDTO provides a data transfer object