Skip to content

Commit

Permalink
Merge pull request #140 from thudry/add_consumers_support
Browse files Browse the repository at this point in the history
Add /api/consumers support
  • Loading branch information
michaelklishin authored Jul 25, 2019
2 parents 4b83e80 + 641510a commit ef44349
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 0 deletions.
65 changes: 65 additions & 0 deletions consumers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package rabbithole

import (
"net/url"
)

type AcknowledgementMode bool

const (
ManualAcknowledgement AcknowledgementMode = true
AutomaticAcknowledgment AcknowledgementMode = false
)

type BriefQueueInfo struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
}

type BriefChannelDetail struct {
ConnectionName string `json:"connection_name"`
Name string `json:"name"`
Node string `json:"node"`
Number int `json:"number"`
PeerHost string `json:"peer_host"`
PeerPort int `json:"peer_port"`
User string `json:"user"`
}

type ConsumerInfo struct {
Arguments map[string]interface{} `json:"arguments"`
AcknowledgementMode AcknowledgementMode `json:"ack_required"`
ChannelDetails BriefChannelDetail `json:"channel_details"`
ConsumerTag string `json:"consumer_tag"`
Exclusive bool `json:"exclusive"`
PrefetchCount int `json:"prefetch_count"`
Queue BriefQueueInfo `json:"queue"`
}

// ListConsumers lists all consumers in the cluster.
func (c *Client) ListConsumers() (rec []ConsumerInfo, err error) {
req, err := newGETRequest(c, "consumers")
if err != nil {
return
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return
}

return
}

// ListConsumersIn lists all consumers in a virtual host.
func (c *Client) ListConsumersIn(vhost string) (rec []ConsumerInfo, err error) {
req, err := newGETRequest(c, "consumers/"+url.PathEscape(vhost))
if err != nil {
return
}

if err = executeAndParseRequest(c, req, &rec); err != nil {
return
}

return
}
88 changes: 88 additions & 0 deletions rabbithole_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,94 @@ var _ = Describe("Rabbithole", func() {
})
})

Context("GET /consumers", func() {
It("returns decoded response", func() {
conn := openConnection("/")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

_, err = ch.QueueDeclare(
"", // name
false, // durable
false, // auto delete
true, // exclusive
false,
nil)
Ω(err).Should(BeNil())

_, err = ch.Consume(
"", // queue
"", // consumer
false, // auto ack
false, // exclusive
false, // no local
false, // no wait
amqp.Table{})
Ω(err).Should(BeNil())

// give internal events a moment to be
// handled
awaitEventPropagation()

cs, err := rmqc.ListConsumers()
Ω(err).Should(BeNil())

Ω(len(cs)).Should(Equal(1))
c := cs[0]
Ω(c.Queue.Name).ShouldNot(Equal(""))
Ω(c.ConsumerTag).ShouldNot(Equal(""))
Ω(c.Exclusive).ShouldNot(BeNil())
Ω(c.AcknowledgementMode).Should(Equal(ManualAcknowledgement))
})
})

Context("GET /consumers/{vhost}", func() {
It("returns decoded response", func() {
conn := openConnection("rabbit/hole")
defer conn.Close()

ch, err := conn.Channel()
Ω(err).Should(BeNil())
defer ch.Close()

_, err = ch.QueueDeclare(
"", // name
false, // durable
false, // auto delete
true, // exclusive
false,
nil)
Ω(err).Should(BeNil())

_, err = ch.Consume(
"", // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
amqp.Table{})
Ω(err).Should(BeNil())

// give internal events a moment to be
// handled
awaitEventPropagation()

cs, err := rmqc.ListConsumers()
Ω(err).Should(BeNil())

Ω(len(cs)).Should(Equal(1))
c := cs[0]
Ω(c.Queue.Name).ShouldNot(Equal(""))
Ω(c.ConsumerTag).ShouldNot(Equal(""))
Ω(c.Exclusive).ShouldNot(BeNil())
Ω(c.AcknowledgementMode).Should(Equal(AutomaticAcknowledgment))
})
})

Context("GET /users", func() {
It("returns decoded response", func() {
xs, err := rmqc.ListUsers()
Expand Down

0 comments on commit ef44349

Please sign in to comment.