Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RabbitMQ exchanges #3619

Merged
merged 3 commits into from
Jan 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions plugins/inputs/rabbitmq/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
## A list of queues to gather as the rabbitmq_queue measurement. If not
## specified, metrics for all queues are gathered.
# queues = ["telegraf"]

## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
```

### Measurements & Fields:
Expand Down Expand Up @@ -95,6 +99,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- messages_redeliver_rate (float, messages per second)
- messages_unack (integer, count)

- rabbitmq_exchange
- messages_publish_in (int, count)
- messages_publish_out (int, count)

### Tags:

- All measurements have the following tags:
Expand All @@ -114,6 +122,15 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd
- durable
- auto_delete

- rabbitmq_exchange
- url
- exchange
- type
- vhost
- internal
- durable
- auto_delete

### Sample Queries:

Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query:
Expand All @@ -129,4 +146,5 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m)
rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,[email protected],durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000
rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000
rabbitmq_node,url=http://amqp.example.org:15672,[email protected],host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000
rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000
```
71 changes: 68 additions & 3 deletions plugins/inputs/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ type RabbitMQ struct {
ResponseHeaderTimeout internal.Duration `toml:"header_timeout"`
ClientTimeout internal.Duration `toml:"client_timeout"`

Nodes []string
Queues []string
Nodes []string
Queues []string
Exchanges []string

Client *http.Client
}
Expand Down Expand Up @@ -78,6 +79,8 @@ type MessageStats struct {
PublishDetails Details `json:"publish_details"`
Redeliver int64
RedeliverDetails Details `json:"redeliver_details"`
PublishIn int64 `json:"publish_in"`
PublishOut int64 `json:"publish_out"`
}

// ObjectTotals ...
Expand Down Expand Up @@ -133,10 +136,20 @@ type Node struct {
SocketsUsed int64 `json:"sockets_used"`
}

type Exchange struct {
Name string
MessageStats `json:"message_stats"`
Type string
Internal bool
Vhost string
Durable bool
AutoDelete bool `json:"auto_delete"`
}

// gatherFunc ...
type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator)

var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues}
var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges}

var sampleConfig = `
## Management Plugin url. (default: http://localhost:15672)
Expand Down Expand Up @@ -171,6 +184,10 @@ var sampleConfig = `
## A list of queues to gather as the rabbitmq_queue measurement. If not
## specified, metrics for all queues are gathered.
# queues = ["telegraf"]

## A list of exchanges to gather as the rabbitmq_exchange measurement. If not
## specified, metrics for all exchanges are gathered.
# exchanges = ["telegraf"]
`

// SampleConfig ...
Expand Down Expand Up @@ -374,6 +391,40 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) {
}
}

func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) {
// Gather information about exchanges
exchanges := make([]Exchange, 0)
err := r.requestJSON("/api/exchanges", &exchanges)
if err != nil {
acc.AddError(err)
return
}

for _, exchange := range exchanges {
if !r.shouldGatherExchange(exchange) {
continue
}
tags := map[string]string{
"url": r.URL,
"exchange": exchange.Name,
"type": exchange.Type,
"vhost": exchange.Vhost,
"internal": strconv.FormatBool(exchange.Internal),
"durable": strconv.FormatBool(exchange.Durable),
"auto_delete": strconv.FormatBool(exchange.AutoDelete),
}

acc.AddFields(
"rabbitmq_exchange",
map[string]interface{}{
"messages_publish_in": exchange.MessageStats.PublishIn,
"messages_publish_out": exchange.MessageStats.PublishOut,
},
tags,
)
}
}

func (r *RabbitMQ) shouldGatherNode(node Node) bool {
if len(r.Nodes) == 0 {
return true
Expand Down Expand Up @@ -402,6 +453,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool {
return false
}

func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool {
if len(r.Exchanges) == 0 {
return true
}

for _, name := range r.Exchanges {
if name == exchange.Name {
return true
}
}

return false
}

func init() {
inputs.Add("rabbitmq", func() telegraf.Input {
return &RabbitMQ{
Expand Down
107 changes: 107 additions & 0 deletions plugins/inputs/rabbitmq/rabbitmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,102 @@ const sampleQueuesResponse = `
]
`

const sampleExchangesResponse = `
[
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "direct",
"vhost": "\/",
"name": ""
},
{
"message_stats": {
"publish_in_details": {
"rate": 0
},
"publish_in": 2,
"publish_out_details": {
"rate": 0
},
"publish_out": 1
},
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "fanout",
"vhost": "\/",
"name": "telegraf"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "direct",
"vhost": "\/",
"name": "amq.direct"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "fanout",
"vhost": "\/",
"name": "amq.fanout"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "headers",
"vhost": "\/",
"name": "amq.headers"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "headers",
"vhost": "\/",
"name": "amq.match"
},
{
"arguments": { },
"internal": true,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.rabbitmq.log"
},
{
"arguments": { },
"internal": true,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.rabbitmq.trace"
},
{
"arguments": { },
"internal": false,
"auto_delete": false,
"durable": true,
"type": "topic",
"vhost": "\/",
"name": "amq.topic"
}
]
`

func TestRabbitMQGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string
Expand All @@ -385,6 +481,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
rsp = sampleNodesResponse
case "/api/queues":
rsp = sampleQueuesResponse
case "/api/exchanges":
rsp = sampleExchangesResponse
default:
panic("Cannot handle request")
}
Expand Down Expand Up @@ -441,4 +539,13 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) {
}

assert.True(t, acc.HasMeasurement("rabbitmq_queue"))

exchangeIntMetrics := []string{
"messages_publish_in",
"messages_publish_out",
}

for _, metric := range exchangeIntMetrics {
assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric))
}
}