diff --git a/consumers.go b/consumers.go new file mode 100644 index 0000000..ef3cd86 --- /dev/null +++ b/consumers.go @@ -0,0 +1,65 @@ +package rabbithole + +import ( + "net/url" +) + +type AcknowledgementMode bool + +const ( + ManualAcknowledgement AcknowledgementMode = true + AutomaticAcknowledgment AcknowledgementMode = false +) + +type BriefQueueInfo struct { + Name string `json:"name"` + Vhost string `json:"vhost"` +} + +type BriefChannelDetail struct { + ConnectionName string `json:"connection_name"` + Name string `json:"name"` + Node string `json:"node"` + Number int `json:"number"` + PeerHost string `json:"peer_host"` + PeerPort int `json:"peer_port"` + User string `json:"user"` +} + +type ConsumerInfo struct { + Arguments map[string]interface{} `json:"arguments"` + AcknowledgementMode AcknowledgementMode `json:"ack_required"` + ChannelDetails BriefChannelDetail `json:"channel_details"` + ConsumerTag string `json:"consumer_tag"` + Exclusive bool `json:"exclusive"` + PrefetchCount int `json:"prefetch_count"` + Queue BriefQueueInfo `json:"queue"` +} + +// ListConsumers lists all consumers in the cluster. +func (c *Client) ListConsumers() (rec []ConsumerInfo, err error) { + req, err := newGETRequest(c, "consumers") + if err != nil { + return + } + + if err = executeAndParseRequest(c, req, &rec); err != nil { + return + } + + return +} + +// ListConsumersIn lists all consumers in a virtual host. +func (c *Client) ListConsumersIn(vhost string) (rec []ConsumerInfo, err error) { + req, err := newGETRequest(c, "consumers/"+url.PathEscape(vhost)) + if err != nil { + return + } + + if err = executeAndParseRequest(c, req, &rec); err != nil { + return + } + + return +} diff --git a/rabbithole_test.go b/rabbithole_test.go index ece1b3e..8ae0f47 100644 --- a/rabbithole_test.go +++ b/rabbithole_test.go @@ -707,6 +707,94 @@ var _ = Describe("Rabbithole", func() { }) }) + Context("GET /consumers", func() { + It("returns decoded response", func() { + conn := openConnection("/") + defer conn.Close() + + ch, err := conn.Channel() + Ω(err).Should(BeNil()) + defer ch.Close() + + _, err = ch.QueueDeclare( + "", // name + false, // durable + false, // auto delete + true, // exclusive + false, + nil) + Ω(err).Should(BeNil()) + + _, err = ch.Consume( + "", // queue + "", // consumer + false, // auto ack + false, // exclusive + false, // no local + false, // no wait + amqp.Table{}) + Ω(err).Should(BeNil()) + + // give internal events a moment to be + // handled + awaitEventPropagation() + + cs, err := rmqc.ListConsumers() + Ω(err).Should(BeNil()) + + Ω(len(cs)).Should(Equal(1)) + c := cs[0] + Ω(c.Queue.Name).ShouldNot(Equal("")) + Ω(c.ConsumerTag).ShouldNot(Equal("")) + Ω(c.Exclusive).ShouldNot(BeNil()) + Ω(c.AcknowledgementMode).Should(Equal(ManualAcknowledgement)) + }) + }) + + Context("GET /consumers/{vhost}", func() { + It("returns decoded response", func() { + conn := openConnection("rabbit/hole") + defer conn.Close() + + ch, err := conn.Channel() + Ω(err).Should(BeNil()) + defer ch.Close() + + _, err = ch.QueueDeclare( + "", // name + false, // durable + false, // auto delete + true, // exclusive + false, + nil) + Ω(err).Should(BeNil()) + + _, err = ch.Consume( + "", // queue + "", // consumer + true, // auto ack + false, // exclusive + false, // no local + false, // no wait + amqp.Table{}) + Ω(err).Should(BeNil()) + + // give internal events a moment to be + // handled + awaitEventPropagation() + + cs, err := rmqc.ListConsumers() + Ω(err).Should(BeNil()) + + Ω(len(cs)).Should(Equal(1)) + c := cs[0] + Ω(c.Queue.Name).ShouldNot(Equal("")) + Ω(c.ConsumerTag).ShouldNot(Equal("")) + Ω(c.Exclusive).ShouldNot(BeNil()) + Ω(c.AcknowledgementMode).Should(Equal(AutomaticAcknowledgment)) + }) + }) + Context("GET /users", func() { It("returns decoded response", func() { xs, err := rmqc.ListUsers()