From 224fab358012ad519d1beb64d46837d1b2253f73 Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Tue, 16 May 2023 12:15:07 +0100 Subject: [PATCH 1/3] Update shovel props to map - dest-application-properties and dest-properties are map instead of string --- shovels.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shovels.go b/shovels.go index 9e6f547..429b861 100644 --- a/shovels.go +++ b/shovels.go @@ -77,10 +77,10 @@ type ShovelDefinition struct { DestinationAddForwardHeaders bool `json:"dest-add-forward-headers,omitempty"` DestinationAddTimestampHeader bool `json:"dest-add-timestamp-header,omitempty"` DestinationAddress string `json:"dest-address,omitempty"` - DestinationApplicationProperties string `json:"dest-application-properties,omitempty"` + DestinationApplicationProperties map[string]interface{} `json:"dest-application-properties,omitempty"` DestinationExchange string `json:"dest-exchange,omitempty"` DestinationExchangeKey string `json:"dest-exchange-key,omitempty"` - DestinationProperties string `json:"dest-properties,omitempty"` + DestinationProperties map[string]interface{} `json:"dest-properties,omitempty"` DestinationProtocol string `json:"dest-protocol,omitempty"` DestinationPublishProperties map[string]interface{} `json:"dest-publish-properties,omitempty"` DestinationQueue string `json:"dest-queue,omitempty"` From a10e6fbde63317f460c43a6fb6c3655ec7454b44 Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Tue, 16 May 2023 12:16:03 +0100 Subject: [PATCH 2/3] Go fmt --- bindings.go | 2 +- doc.go | 379 ++++++++++++++++++++------------------- queues.go | 2 - rabbithole_suite_test.go | 2 +- rabbithole_test.go | 2 +- 5 files changed, 193 insertions(+), 194 deletions(-) diff --git a/bindings.go b/bindings.go index 614424e..3404c03 100644 --- a/bindings.go +++ b/bindings.go @@ -214,7 +214,7 @@ func (c *Client) bindingPath(vhost string, info BindingInfo) string { // DeleteBinding deletes an individual binding func (c *Client) DeleteBinding(vhost string, info BindingInfo) (res *http.Response, err error) { - req, err := newRequestWithBody(c, "DELETE", c.bindingPath(vhost, info) + + req, err := newRequestWithBody(c, "DELETE", c.bindingPath(vhost, info)+ "/"+url.PathEscape(info.PropertiesKey), nil) if err != nil { return nil, err diff --git a/doc.go b/doc.go index 6c92832..bd2dc46 100644 --- a/doc.go +++ b/doc.go @@ -4,280 +4,281 @@ Package rabbithole is a Go client for the RabbitMQ HTTP API. All HTTP API operations are accessible via `rabbithole.Client`, which should be instantiated with `rabbithole.NewClient`. - // URI, username, password - rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest") + // URI, username, password + rmqc, _ = NewClient("http://127.0.0.1:15672", "guest", "guest") Getting Overview - res, err := rmqc.Overview() + res, err := rmqc.Overview() Node and Cluster Status - var err error + var err error - // => []NodeInfo, err - xs, err := rmqc.ListNodes() + // => []NodeInfo, err + xs, err := rmqc.ListNodes() - node, err := rmqc.GetNode("rabbit@mercurio") - // => NodeInfo, err + node, err := rmqc.GetNode("rabbit@mercurio") + // => NodeInfo, err Operations on Connections - xs, err := rmqc.ListConnections() - // => []ConnectionInfo, err + xs, err := rmqc.ListConnections() + // => []ConnectionInfo, err - conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672") - // => ConnectionInfo, err + conn, err := rmqc.GetConnection("127.0.0.1:50545 -> 127.0.0.1:5672") + // => ConnectionInfo, err - // Forcefully close connection - _, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672") - // => *http.Response, err + // Forcefully close connection + _, err := rmqc.CloseConnection("127.0.0.1:50545 -> 127.0.0.1:5672") + // => *http.Response, err Operations on Channels - xs, err := rmqc.ListChannels() - // => []ChannelInfo, err + xs, err := rmqc.ListChannels() + // => []ChannelInfo, err - ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)") - // => ChannelInfo, err + ch, err := rmqc.GetChannel("127.0.0.1:50545 -> 127.0.0.1:5672 (1)") + // => ChannelInfo, err Operations on Exchanges - xs, err := rmqc.ListExchanges() - // => []ExchangeInfo, err + xs, err := rmqc.ListExchanges() + // => []ExchangeInfo, err - // list exchanges in a vhost - xs, err := rmqc.ListExchangesIn("/") - // => []ExchangeInfo, err + // list exchanges in a vhost + xs, err := rmqc.ListExchangesIn("/") + // => []ExchangeInfo, err - // information about individual exchange - x, err := rmqc.GetExchange("/", "amq.fanout") - // => ExchangeInfo, err + // information about individual exchange + x, err := rmqc.GetExchange("/", "amq.fanout") + // => ExchangeInfo, err - // declares an exchange - resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false}) - // => *http.Response, err + // declares an exchange + resp, err := rmqc.DeclareExchange("/", "an.exchange", ExchangeSettings{Type: "fanout", Durable: false}) + // => *http.Response, err - // deletes individual exchange - resp, err := rmqc.DeleteExchange("/", "an.exchange") - // => *http.Response, err + // deletes individual exchange + resp, err := rmqc.DeleteExchange("/", "an.exchange") + // => *http.Response, err Operations on Queues - xs, err := rmqc.ListQueues() - // => []QueueInfo, err + xs, err := rmqc.ListQueues() + // => []QueueInfo, err - // list queues in a vhost - xs, err := rmqc.ListQueuesIn("/") - // => []QueueInfo, err + // list queues in a vhost + xs, err := rmqc.ListQueuesIn("/") + // => []QueueInfo, err - // information about individual queue - x, err := rmqc.GetQueue("/", "a.queue") - // => QueueInfo, err + // information about individual queue + x, err := rmqc.GetQueue("/", "a.queue") + // => QueueInfo, err - // declares a queue - resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false}) - // => *http.Response, err + // declares a queue + resp, err := rmqc.DeclareQueue("/", "a.queue", QueueSettings{Durable: false}) + // => *http.Response, err - // deletes individual queue - resp, err := rmqc.DeleteQueue("/", "a.queue") - // => *http.Response, err + // deletes individual queue + resp, err := rmqc.DeleteQueue("/", "a.queue") + // => *http.Response, err - // purges all messages in queue - resp, err := rmqc.PurgeQueue("/", "a.queue") - // => *http.Response, err + // purges all messages in queue + resp, err := rmqc.PurgeQueue("/", "a.queue") + // => *http.Response, err Operations on Bindings - bs, err := rmqc.ListBindings() - // => []BindingInfo, err - - // list bindings in a vhost - bs, err := rmqc.ListBindingsIn("/") - // => []BindingInfo, err - - // list bindings of a queue - bs, err := rmqc.ListQueueBindings("/", "a.queue") - // => []BindingInfo, err - - // declare a binding - resp, err := rmqc.DeclareBinding("/", BindingInfo{ - Source: "an.exchange", - Destination: "a.queue", - DestinationType: "queue", - RoutingKey: "#", - }) - // => *http.Response, err - - // deletes individual binding - resp, err := rmqc.DeleteBinding("/", BindingInfo{ - Source: "an.exchange", - Destination: "a.queue", - DestinationType: "queue", - RoutingKey: "#", - PropertiesKey: "%23", - }) - // => *http.Response, err + bs, err := rmqc.ListBindings() + // => []BindingInfo, err + + // list bindings in a vhost + bs, err := rmqc.ListBindingsIn("/") + // => []BindingInfo, err + + // list bindings of a queue + bs, err := rmqc.ListQueueBindings("/", "a.queue") + // => []BindingInfo, err + + // declare a binding + resp, err := rmqc.DeclareBinding("/", BindingInfo{ + Source: "an.exchange", + Destination: "a.queue", + DestinationType: "queue", + RoutingKey: "#", + }) + // => *http.Response, err + + // deletes individual binding + resp, err := rmqc.DeleteBinding("/", BindingInfo{ + Source: "an.exchange", + Destination: "a.queue", + DestinationType: "queue", + RoutingKey: "#", + PropertiesKey: "%23", + }) + // => *http.Response, err Operations on Vhosts - xs, err := rmqc.ListVhosts() - // => []VhostInfo, err + xs, err := rmqc.ListVhosts() + // => []VhostInfo, err - // information about individual vhost - x, err := rmqc.GetVhost("/") - // => VhostInfo, err + // information about individual vhost + x, err := rmqc.GetVhost("/") + // => VhostInfo, err - // creates or updates individual vhost - resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false}) - // => *http.Response, err + // creates or updates individual vhost + resp, err := rmqc.PutVhost("/", VhostSettings{Tracing: false}) + // => *http.Response, err - // deletes individual vhost - resp, err := rmqc.DeleteVhost("/") - // => *http.Response, err + // deletes individual vhost + resp, err := rmqc.DeleteVhost("/") + // => *http.Response, err Managing Users - xs, err := rmqc.ListUsers() - // => []UserInfo, err + xs, err := rmqc.ListUsers() + // => []UserInfo, err - // information about individual user - x, err := rmqc.GetUser("my.user") - // => UserInfo, err + // information about individual user + x, err := rmqc.GetUser("my.user") + // => UserInfo, err - // creates or updates individual user - resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management policymaker"}) - // => *http.Response, err + // creates or updates individual user + resp, err := rmqc.PutUser("my.user", UserSettings{Password: "s3krE7", Tags: "management policymaker"}) + // => *http.Response, err - // deletes individual user - resp, err := rmqc.DeleteUser("my.user") - // => *http.Response, err + // deletes individual user + resp, err := rmqc.DeleteUser("my.user") + // => *http.Response, err Managing Permissions - xs, err := rmqc.ListPermissions() - // => []PermissionInfo, err + xs, err := rmqc.ListPermissions() + // => []PermissionInfo, err - // permissions of individual user - x, err := rmqc.ListPermissionsOf("my.user") - // => []PermissionInfo, err + // permissions of individual user + x, err := rmqc.ListPermissionsOf("my.user") + // => []PermissionInfo, err - // permissions of individual user in vhost - x, err := rmqc.GetPermissionsIn("/", "my.user") - // => PermissionInfo, err + // permissions of individual user in vhost + x, err := rmqc.GetPermissionsIn("/", "my.user") + // => PermissionInfo, err - // updates permissions of user in vhost - resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"}) - // => *http.Response, err + // updates permissions of user in vhost + resp, err := rmqc.UpdatePermissionsIn("/", "my.user", Permissions{Configure: ".*", Write: ".*", Read: ".*"}) + // => *http.Response, err - // revokes permissions in vhost - resp, err := rmqc.ClearPermissionsIn("/", "my.user") - // => *http.Response, err + // revokes permissions in vhost + resp, err := rmqc.ClearPermissionsIn("/", "my.user") + // => *http.Response, err Managing Topic Permissions - xs, err := rmqc.ListTopicPermissions() - // => []TopicPermissionInfo, err + xs, err := rmqc.ListTopicPermissions() + // => []TopicPermissionInfo, err - // permissions of individual user - x, err := rmqc.ListTopicPermissionsOf("my.user") - // => []TopicPermissionInfo, err + // permissions of individual user + x, err := rmqc.ListTopicPermissionsOf("my.user") + // => []TopicPermissionInfo, err - // permissions of individual user in vhost - x, err := rmqc.GetTopicPermissionsIn("/", "my.user") - // => []TopicPermissionInfo, err + // permissions of individual user in vhost + x, err := rmqc.GetTopicPermissionsIn("/", "my.user") + // => []TopicPermissionInfo, err - // updates permissions of user in vhost - resp, err := rmqc.UpdateTopicPermissionsIn("/", "my.user", Permissions{Exchange: "amq.topic", Write: ".*", Read: ".*"}) - // => *http.Response, err + // updates permissions of user in vhost + resp, err := rmqc.UpdateTopicPermissionsIn("/", "my.user", Permissions{Exchange: "amq.topic", Write: ".*", Read: ".*"}) + // => *http.Response, err - // revokes permissions in vhost - resp, err := rmqc.ClearTopicPermissionsIn("/", "my.user") - // => *http.Response, err + // revokes permissions in vhost + resp, err := rmqc.ClearTopicPermissionsIn("/", "my.user") + // => *http.Response, err - // revokes single permissions in vhost - resp, err := rmqc.DeleteTopicPermissionsIn("/", "my.user", "exchange") - // => *http.Response, err + // revokes single permissions in vhost + resp, err := rmqc.DeleteTopicPermissionsIn("/", "my.user", "exchange") + // => *http.Response, err Managing Runtime Parameters - // list all runtime parameters - params, err := rmqc.ListRuntimeParameters() - // => []RuntimeParameter, error + // list all runtime parameters + params, err := rmqc.ListRuntimeParameters() + // => []RuntimeParameter, error - // list all runtime parameters for a component - params, err := rmqc.ListRuntimeParametersFor("federation-upstream") - // => []RuntimeParameter, error + // list all runtime parameters for a component + params, err := rmqc.ListRuntimeParametersFor("federation-upstream") + // => []RuntimeParameter, error - // list runtime parameters in a vhost - params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/") - // => []RuntimeParameter, error + // list runtime parameters in a vhost + params, err := rmqc.ListRuntimeParametersIn("federation-upstream", "/") + // => []RuntimeParameter, error - // information about a runtime parameter - p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name") - // => *RuntimeParameter, error + // information about a runtime parameter + p, err := rmqc.GetRuntimeParameter("federation-upstream", "/", "name") + // => *RuntimeParameter, error - // declare or update a runtime parameter - resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{ - Uri: "amqp://server-name", - }) - // => *http.Response, error + // declare or update a runtime parameter + resp, err := rmqc.PutRuntimeParameter("federation-upstream", "/", "name", FederationDefinition{ + Uri: "amqp://server-name", + }) + // => *http.Response, error - // remove a runtime parameter - resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name") - // => *http.Response, error + // remove a runtime parameter + resp, err := rmqc.DeleteRuntimeParameter("federation-upstream", "/", "name") + // => *http.Response, error Managing Federation Upstreams - // list all federation upstreams - ups, err := rmqc.ListFederationUpstreams() - // => []FederationUpstream, error + // list all federation upstreams + ups, err := rmqc.ListFederationUpstreams() + // => []FederationUpstream, error - // list federation upstreams in a vhost - ups, err := rmqc.ListFederationUpstreamsIn("/") - // => []FederationUpstream, error + // list federation upstreams in a vhost + ups, err := rmqc.ListFederationUpstreamsIn("/") + // => []FederationUpstream, error - // information about a federated upstream - up, err := rmqc.GetFederationUpstream("/", "upstream-name") - // => *FederationUpstream, error + // information about a federated upstream + up, err := rmqc.GetFederationUpstream("/", "upstream-name") + // => *FederationUpstream, error - // declare or update a federation upstream - resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{ - Uri: "amqp://server-name", - }) - // => *http.Response, error + // declare or update a federation upstream + resp, err := rmqc.PutFederationUpstream("/", "upstream-name", FederationDefinition{ + Uri: "amqp://server-name", + }) + // => *http.Response, error - // delete an upstream - resp, err := rmqc.DeleteFederationUpstream("/", "upstream-name") - // => *http.Response, error + // delete an upstream + resp, err := rmqc.DeleteFederationUpstream("/", "upstream-name") + // => *http.Response, error Managing Global Parameters - // list all global parameters - params, err := rmqc.ListGlobalParameters() - // => []GlobalRuntimeParameter, error + // list all global parameters + params, err := rmqc.ListGlobalParameters() + // => []GlobalRuntimeParameter, error - // get a global parameter - p, err := rmqc.GetGlobalParameter("name") - // => *GlobalRuntimeParameter, error + // get a global parameter + p, err := rmqc.GetGlobalParameter("name") + // => *GlobalRuntimeParameter, error - // declare or update a global parameter - resp, err := rmqc.PutGlobalParameter("name", map[string]interface{ - endpoints: "amqp://server-name", - }) - // => *http.Response, error + // declare or update a global parameter + resp, err := rmqc.PutGlobalParameter("name", map[string]interface{ + endpoints: "amqp://server-name", + }) + // => *http.Response, error - // delete a global parameter - resp, err := rmqc.DeleteGlobalParameter("name") - // => *http.Response, error + // delete a global parameter + resp, err := rmqc.DeleteGlobalParameter("name") + // => *http.Response, error Operations on cluster name - // Get cluster name - cn, err := rmqc.GetClusterName() - // => ClusterName, err - // Rename cluster - resp, err := rmqc.SetClusterName(ClusterName{Name: "rabbitmq@rabbit-hole"}) - // => *http.Response, err + // Get cluster name + cn, err := rmqc.GetClusterName() + // => ClusterName, err + + // Rename cluster + resp, err := rmqc.SetClusterName(ClusterName{Name: "rabbitmq@rabbit-hole"}) + // => *http.Response, err */ package rabbithole diff --git a/queues.go b/queues.go index 34985c2..a13aac1 100644 --- a/queues.go +++ b/queues.go @@ -476,9 +476,7 @@ func (c *Client) CancelSyncQueue(vhost, queue string) (res *http.Response, err e return c.sendQueueAction(vhost, queue, queueAction{"cancel_sync"}) } -// // POST /api/queues/{vhost}/{name}/actions -// func (c *Client) sendQueueAction(vhost string, queue string, action queueAction) (res *http.Response, err error) { body, err := json.Marshal(action) if err != nil { diff --git a/rabbithole_suite_test.go b/rabbithole_suite_test.go index 73347bb..96c41a1 100644 --- a/rabbithole_suite_test.go +++ b/rabbithole_suite_test.go @@ -10,6 +10,6 @@ import ( func TestRabbitHole(t *testing.T) { RegisterFailHandler(Fail) - SetDefaultEventuallyTimeout(5*time.Second) + SetDefaultEventuallyTimeout(5 * time.Second) RunSpecs(t, "Rabbithole Suite") } diff --git a/rabbithole_test.go b/rabbithole_test.go index 66e4b4f..0c85b3d 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -1196,7 +1196,7 @@ var _ = Describe("RabbitMQ HTTP API client", func() { Ω(err).Should(BeNil()) }) When("A default queue type is set", func() { - It("creates a vhost with a default queue type", func(){ + It("creates a vhost with a default queue type", func() { vh := "rabbit/hole3" tags := VhostTags{"production", "eu-west-1"} vs := VhostSettings{Description: "rabbit/hole3 vhost", DefaultQueueType: "quorum", Tags: tags, Tracing: false} From c3b509a622694d1b75c034963460829690465c0c Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Wed, 17 May 2023 15:28:25 +0100 Subject: [PATCH 3/3] Add missing shovel parameters - dest-queue-args, src-queue-args, src-consumer-args, and dest-message-annotations - other "missing" parameters 'add-timestamp-header' and 'publish-properties' are not added because their equivalent 'dest-add-timestamp-header' and 'dest-publish-properties' are already present --- rabbithole_test.go | 64 +++++++++++++++++++++++++++++++--------------- shovels.go | 4 +++ 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/rabbithole_test.go b/rabbithole_test.go index 0c85b3d..1f4b566 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -3479,17 +3479,21 @@ var _ = Describe("RabbitMQ HTTP API client", func() { sdu := URISet([]string{"amqp://127.0.0.1/%2f", "amqp://localhost/%2f"}) shovelDefinition := ShovelDefinition{ - SourceURI: ssu, - SourceAddress: "mySourceQueue", - SourceProtocol: "amqp10", - DestinationURI: sdu, - DestinationProtocol: "amqp10", - DestinationAddress: "myDestQueue", - DestinationAddForwardHeaders: true, - DestinationAddTimestampHeader: true, - AckMode: "on-confirm", - SourcePrefetchCount: 42, - SourceDeleteAfter: "never"} + AckMode: "on-confirm", + ReconnectDelay: 20, + SourceURI: ssu, + SourceAddress: "mySourceQueue", + SourceProtocol: "amqp10", + SourcePrefetchCount: 42, + SourceDeleteAfter: "never", + DestinationURI: sdu, + DestinationProtocol: "amqp10", + DestinationAddress: "myDestQueue", + DestinationAddForwardHeaders: true, + DestinationAddTimestampHeader: true, + DestinationApplicationProperties: map[string]interface{}{"key": "value"}, + DestinationMessageAnnotations: map[string]interface{}{"annotation": "something"}, + DestinationProperties: map[string]interface{}{"prop0": "value0"}} _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) Ω(err).Should(BeNil()) @@ -3506,17 +3510,21 @@ var _ = Describe("RabbitMQ HTTP API client", func() { Ω(x.Name).Should(Equal(sn)) Ω(x.Vhost).Should(Equal(vh)) Ω(x.Component).Should(Equal("shovel")) + Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.ReconnectDelay).Should(Equal(20)) Ω(x.Definition.SourceAddress).Should(Equal("mySourceQueue")) Ω(x.Definition.SourceURI).Should(Equal(ssu)) Ω(x.Definition.SourcePrefetchCount).Should(Equal(42)) Ω(x.Definition.SourceProtocol).Should(Equal("amqp10")) + Ω(string(x.Definition.SourceDeleteAfter)).Should(Equal("never")) Ω(x.Definition.DestinationAddress).Should(Equal("myDestQueue")) Ω(x.Definition.DestinationURI).Should(Equal(sdu)) Ω(x.Definition.DestinationProtocol).Should(Equal("amqp10")) Ω(x.Definition.DestinationAddForwardHeaders).Should(Equal(true)) Ω(x.Definition.DestinationAddTimestampHeader).Should(Equal(true)) - Ω(x.Definition.AckMode).Should(Equal("on-confirm")) - Ω(string(x.Definition.SourceDeleteAfter)).Should(Equal("never")) + Ω(x.Definition.DestinationApplicationProperties).Should(HaveKeyWithValue("key", "value")) + Ω(x.Definition.DestinationMessageAnnotations).Should(HaveKeyWithValue("annotation", "something")) + Ω(x.Definition.DestinationProperties).Should(HaveKeyWithValue("prop0", "value0")) _, err = rmqc.DeleteShovel(vh, sn) Ω(err).Should(BeNil()) @@ -3544,13 +3552,20 @@ var _ = Describe("RabbitMQ HTTP API client", func() { sdu := URISet([]string{"amqp://127.0.0.1/%2f"}) shovelDefinition := ShovelDefinition{ - SourceURI: ssu, - SourceQueue: "mySourceQueue", - DestinationURI: sdu, - DestinationQueue: "myDestQueue", - AddForwardHeaders: true, - AckMode: "on-confirm", - DeleteAfter: "never"} + AckMode: "on-confirm", + ReconnectDelay: 20, + SourceURI: ssu, + SourceQueue: "mySourceQueue", + SourceQueueArgs: map[string]interface{}{"x-message-ttl": 12000}, + SourceConsumerArgs: map[string]interface{}{"x-priority": 2}, + SourcePrefetchCount: 5, + DestinationURI: sdu, + DestinationQueue: "myDestQueue", + DestinationQueueArgs: map[string]interface{}{"x-expires": 222000}, + AddForwardHeaders: true, + DestinationAddTimestampHeader: true, + DestinationPublishProperties: map[string]interface{}{"delivery_mode": 1}, + DeleteAfter: "never"} _, err := rmqc.DeclareShovel(vh, sn, shovelDefinition) Ω(err).Should(BeNil(), "Error declaring shovel") @@ -3567,12 +3582,19 @@ var _ = Describe("RabbitMQ HTTP API client", func() { Ω(x.Name).Should(Equal(sn)) Ω(x.Vhost).Should(Equal(vh)) Ω(x.Component).Should(Equal("shovel")) + Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.ReconnectDelay).Should(Equal(20)) Ω(x.Definition.SourceURI).Should(Equal(ssu)) Ω(x.Definition.SourceQueue).Should(Equal("mySourceQueue")) + Ω(x.Definition.SourceQueueArgs).Should(HaveKeyWithValue("x-message-ttl", float64(12000))) + Ω(x.Definition.SourceConsumerArgs).Should(HaveKeyWithValue("x-priority", float64(2))) + Ω(x.Definition.SourcePrefetchCount).Should(Equal(5)) Ω(x.Definition.DestinationURI).Should(Equal(sdu)) Ω(x.Definition.DestinationQueue).Should(Equal("myDestQueue")) + Ω(x.Definition.DestinationQueueArgs).Should(HaveKeyWithValue("x-expires", float64(222000))) Ω(x.Definition.AddForwardHeaders).Should(Equal(true)) - Ω(x.Definition.AckMode).Should(Equal("on-confirm")) + Ω(x.Definition.DestinationAddTimestampHeader).Should(Equal(true)) + Ω(x.Definition.DestinationPublishProperties).Should(HaveKeyWithValue("delivery_mode", float64(1))) Ω(string(x.Definition.DeleteAfter)).Should(Equal("never")) _, err = rmqc.DeleteShovel(vh, sn) diff --git a/shovels.go b/shovels.go index 429b861..69f0871 100644 --- a/shovels.go +++ b/shovels.go @@ -84,6 +84,8 @@ type ShovelDefinition struct { DestinationProtocol string `json:"dest-protocol,omitempty"` DestinationPublishProperties map[string]interface{} `json:"dest-publish-properties,omitempty"` DestinationQueue string `json:"dest-queue,omitempty"` + DestinationQueueArgs map[string]interface{} `json:"dest-queue-args,omitempty"` + DestinationMessageAnnotations map[string]interface{} `json:"dest-message-annotations,omitempty"` PrefetchCount int `json:"prefetch-count,omitempty"` ReconnectDelay int `json:"reconnect-delay,omitempty"` SourceAddress string `json:"src-address,omitempty"` @@ -93,6 +95,8 @@ type ShovelDefinition struct { SourcePrefetchCount int `json:"src-prefetch-count,omitempty"` SourceProtocol string `json:"src-protocol,omitempty"` SourceQueue string `json:"src-queue,omitempty"` + SourceQueueArgs map[string]interface{} `json:"src-queue-args,omitempty"` + SourceConsumerArgs map[string]interface{} `json:"src-consumer-args,omitempty"` } // ShovelDefinitionDTO provides a data transfer object