Skip to content

Commit

Permalink
Add missing shovel parameters
Browse files Browse the repository at this point in the history
- dest-queue-args, src-queue-args, src-consumer-args,
and dest-message-annotations
- other "missing" parameters 'add-timestamp-header' and
'publish-properties' are not added because their equivalent
'dest-add-timestamp-header' and 'dest-publish-properties'
are already present
  • Loading branch information
ChunyiLyu committed May 17, 2023
1 parent a10e6fb commit c3b509a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 21 deletions.
64 changes: 43 additions & 21 deletions rabbithole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3479,17 +3479,21 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
sdu := URISet([]string{"amqp://127.0.0.1/%2f", "amqp://localhost/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
SourceAddress: "mySourceQueue",
SourceProtocol: "amqp10",
DestinationURI: sdu,
DestinationProtocol: "amqp10",
DestinationAddress: "myDestQueue",
DestinationAddForwardHeaders: true,
DestinationAddTimestampHeader: true,
AckMode: "on-confirm",
SourcePrefetchCount: 42,
SourceDeleteAfter: "never"}
AckMode: "on-confirm",
ReconnectDelay: 20,
SourceURI: ssu,
SourceAddress: "mySourceQueue",
SourceProtocol: "amqp10",
SourcePrefetchCount: 42,
SourceDeleteAfter: "never",
DestinationURI: sdu,
DestinationProtocol: "amqp10",
DestinationAddress: "myDestQueue",
DestinationAddForwardHeaders: true,
DestinationAddTimestampHeader: true,
DestinationApplicationProperties: map[string]interface{}{"key": "value"},
DestinationMessageAnnotations: map[string]interface{}{"annotation": "something"},
DestinationProperties: map[string]interface{}{"prop0": "value0"}}

_, err := rmqc.DeclareShovel(vh, sn, shovelDefinition)
Ω(err).Should(BeNil())
Expand All @@ -3506,17 +3510,21 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(x.Name).Should(Equal(sn))
Ω(x.Vhost).Should(Equal(vh))
Ω(x.Component).Should(Equal("shovel"))
Ω(x.Definition.AckMode).Should(Equal("on-confirm"))
Ω(x.Definition.ReconnectDelay).Should(Equal(20))
Ω(x.Definition.SourceAddress).Should(Equal("mySourceQueue"))
Ω(x.Definition.SourceURI).Should(Equal(ssu))
Ω(x.Definition.SourcePrefetchCount).Should(Equal(42))
Ω(x.Definition.SourceProtocol).Should(Equal("amqp10"))
Ω(string(x.Definition.SourceDeleteAfter)).Should(Equal("never"))
Ω(x.Definition.DestinationAddress).Should(Equal("myDestQueue"))
Ω(x.Definition.DestinationURI).Should(Equal(sdu))
Ω(x.Definition.DestinationProtocol).Should(Equal("amqp10"))
Ω(x.Definition.DestinationAddForwardHeaders).Should(Equal(true))
Ω(x.Definition.DestinationAddTimestampHeader).Should(Equal(true))
Ω(x.Definition.AckMode).Should(Equal("on-confirm"))
Ω(string(x.Definition.SourceDeleteAfter)).Should(Equal("never"))
Ω(x.Definition.DestinationApplicationProperties).Should(HaveKeyWithValue("key", "value"))
Ω(x.Definition.DestinationMessageAnnotations).Should(HaveKeyWithValue("annotation", "something"))
Ω(x.Definition.DestinationProperties).Should(HaveKeyWithValue("prop0", "value0"))

_, err = rmqc.DeleteShovel(vh, sn)
Ω(err).Should(BeNil())
Expand Down Expand Up @@ -3544,13 +3552,20 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
sdu := URISet([]string{"amqp://127.0.0.1/%2f"})

shovelDefinition := ShovelDefinition{
SourceURI: ssu,
SourceQueue: "mySourceQueue",
DestinationURI: sdu,
DestinationQueue: "myDestQueue",
AddForwardHeaders: true,
AckMode: "on-confirm",
DeleteAfter: "never"}
AckMode: "on-confirm",
ReconnectDelay: 20,
SourceURI: ssu,
SourceQueue: "mySourceQueue",
SourceQueueArgs: map[string]interface{}{"x-message-ttl": 12000},
SourceConsumerArgs: map[string]interface{}{"x-priority": 2},
SourcePrefetchCount: 5,
DestinationURI: sdu,
DestinationQueue: "myDestQueue",
DestinationQueueArgs: map[string]interface{}{"x-expires": 222000},
AddForwardHeaders: true,
DestinationAddTimestampHeader: true,
DestinationPublishProperties: map[string]interface{}{"delivery_mode": 1},
DeleteAfter: "never"}

_, err := rmqc.DeclareShovel(vh, sn, shovelDefinition)
Ω(err).Should(BeNil(), "Error declaring shovel")
Expand All @@ -3567,12 +3582,19 @@ var _ = Describe("RabbitMQ HTTP API client", func() {
Ω(x.Name).Should(Equal(sn))
Ω(x.Vhost).Should(Equal(vh))
Ω(x.Component).Should(Equal("shovel"))
Ω(x.Definition.AckMode).Should(Equal("on-confirm"))
Ω(x.Definition.ReconnectDelay).Should(Equal(20))
Ω(x.Definition.SourceURI).Should(Equal(ssu))
Ω(x.Definition.SourceQueue).Should(Equal("mySourceQueue"))
Ω(x.Definition.SourceQueueArgs).Should(HaveKeyWithValue("x-message-ttl", float64(12000)))
Ω(x.Definition.SourceConsumerArgs).Should(HaveKeyWithValue("x-priority", float64(2)))
Ω(x.Definition.SourcePrefetchCount).Should(Equal(5))
Ω(x.Definition.DestinationURI).Should(Equal(sdu))
Ω(x.Definition.DestinationQueue).Should(Equal("myDestQueue"))
Ω(x.Definition.DestinationQueueArgs).Should(HaveKeyWithValue("x-expires", float64(222000)))
Ω(x.Definition.AddForwardHeaders).Should(Equal(true))
Ω(x.Definition.AckMode).Should(Equal("on-confirm"))
Ω(x.Definition.DestinationAddTimestampHeader).Should(Equal(true))
Ω(x.Definition.DestinationPublishProperties).Should(HaveKeyWithValue("delivery_mode", float64(1)))
Ω(string(x.Definition.DeleteAfter)).Should(Equal("never"))

_, err = rmqc.DeleteShovel(vh, sn)
Expand Down
4 changes: 4 additions & 0 deletions shovels.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ type ShovelDefinition struct {
DestinationProtocol string `json:"dest-protocol,omitempty"`
DestinationPublishProperties map[string]interface{} `json:"dest-publish-properties,omitempty"`
DestinationQueue string `json:"dest-queue,omitempty"`
DestinationQueueArgs map[string]interface{} `json:"dest-queue-args,omitempty"`
DestinationMessageAnnotations map[string]interface{} `json:"dest-message-annotations,omitempty"`
PrefetchCount int `json:"prefetch-count,omitempty"`
ReconnectDelay int `json:"reconnect-delay,omitempty"`
SourceAddress string `json:"src-address,omitempty"`
Expand All @@ -93,6 +95,8 @@ type ShovelDefinition struct {
SourcePrefetchCount int `json:"src-prefetch-count,omitempty"`
SourceProtocol string `json:"src-protocol,omitempty"`
SourceQueue string `json:"src-queue,omitempty"`
SourceQueueArgs map[string]interface{} `json:"src-queue-args,omitempty"`
SourceConsumerArgs map[string]interface{} `json:"src-consumer-args,omitempty"`
}

// ShovelDefinitionDTO provides a data transfer object
Expand Down

0 comments on commit c3b509a

Please sign in to comment.