From 69549bdf115bb34723f135984ec823a4a852a18c Mon Sep 17 00:00:00 2001 From: kerams Date: Fri, 22 Dec 2017 23:14:02 +0100 Subject: [PATCH 1/3] Add rabbitmq_exchange measurement --- plugins/inputs/rabbitmq/README.md | 17 ++++ plugins/inputs/rabbitmq/rabbitmq.go | 69 ++++++++++++++- plugins/inputs/rabbitmq/rabbitmq_test.go | 107 +++++++++++++++++++++++ 3 files changed, 190 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md index 83e4bd2ee55fd..d5e2eb5f68b9a 100644 --- a/plugins/inputs/rabbitmq/README.md +++ b/plugins/inputs/rabbitmq/README.md @@ -40,6 +40,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd ## A list of queues to gather as the rabbitmq_queue measurement. If not ## specified, metrics for all queues are gathered. # queues = ["telegraf"] + + ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not + ## specified, metrics for all exchanges are gathered. + # exchanges = ["telegraf"] ``` ### Measurements & Fields: @@ -95,6 +99,10 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - messages_redeliver_rate (float, messages per second) - messages_unack (integer, count) +- rabbitmq_exchange + - messages_publish_in (int, count) + - messages_publish_out (int, count) + ### Tags: - All measurements have the following tags: @@ -114,6 +122,14 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - durable - auto_delete +- rabbitmq_exchange + - url + - exchange + - vhost + - internal + - durable + - auto_delete + ### Sample Queries: Message rates for the entire node can be calculated from total message counts. For instance, to get the rate of messages published per minute, use this query: @@ -129,4 +145,5 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m) rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000 rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000 rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000 +rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000 ``` diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 06966ab58e3f6..48d26f4a1c331 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -48,8 +48,9 @@ type RabbitMQ struct { ResponseHeaderTimeout internal.Duration `toml:"header_timeout"` ClientTimeout internal.Duration `toml:"client_timeout"` - Nodes []string - Queues []string + Nodes []string + Queues []string + Exchanges []string Client *http.Client } @@ -78,6 +79,8 @@ type MessageStats struct { PublishDetails Details `json:"publish_details"` Redeliver int64 RedeliverDetails Details `json:"redeliver_details"` + PublishIn int64 `json:"publish_in"` + PublishOut int64 `json:"publish_out"` } // ObjectTotals ... @@ -133,10 +136,19 @@ type Node struct { SocketsUsed int64 `json:"sockets_used"` } +type Exchange struct { + Name string + MessageStats `json:"message_stats"` + Internal bool + Vhost string + Durable bool + AutoDelete bool `json:"auto_delete"` +} + // gatherFunc ... type gatherFunc func(r *RabbitMQ, acc telegraf.Accumulator) -var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues} +var gatherFunctions = []gatherFunc{gatherOverview, gatherNodes, gatherQueues, gatherExchanges} var sampleConfig = ` ## Management Plugin url. (default: http://localhost:15672) @@ -171,6 +183,10 @@ var sampleConfig = ` ## A list of queues to gather as the rabbitmq_queue measurement. If not ## specified, metrics for all queues are gathered. # queues = ["telegraf"] + + ## A list of exchanges to gather as the rabbitmq_exchange measurement. If not + ## specified, metrics for all exchanges are gathered. + # exchanges = ["telegraf"] ` // SampleConfig ... @@ -374,6 +390,39 @@ func gatherQueues(r *RabbitMQ, acc telegraf.Accumulator) { } } +func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) { + // Gather information about exchanges + exchanges := make([]Exchange, 0) + err := r.requestJSON("/api/exchanges", &exchanges) + if err != nil { + acc.AddError(err) + return + } + + for _, exchange := range exchanges { + if !r.shouldGatherExchange(exchange) { + continue + } + tags := map[string]string{ + "url": r.URL, + "exchange": exchange.Name, + "vhost": exchange.Vhost, + "internal": strconv.FormatBool(exchange.Internal), + "durable": strconv.FormatBool(exchange.Durable), + "auto_delete": strconv.FormatBool(exchange.AutoDelete), + } + + acc.AddFields( + "rabbitmq_exchange", + map[string]interface{}{ + "messages_publish_in": exchange.MessageStats.PublishIn, + "messages_publish_out": exchange.MessageStats.PublishOut, + }, + tags, + ) + } +} + func (r *RabbitMQ) shouldGatherNode(node Node) bool { if len(r.Nodes) == 0 { return true @@ -402,6 +451,20 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool { return false } +func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool { + if len(r.Exchanges) == 0 { + return true + } + + for _, name := range r.Exchanges { + if name == exchange.Name { + return true + } + } + + return false +} + func init() { inputs.Add("rabbitmq", func() telegraf.Input { return &RabbitMQ{ diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index 3be0259bc2450..f315e22f1d75f 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -374,6 +374,102 @@ const sampleQueuesResponse = ` ] ` +const sampleExchangesResponse = ` +[ + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "direct", + "vhost": "\/", + "name": "" + }, + { + "message_stats": { + "publish_in_details": { + "rate": 0 + }, + "publish_in": 2, + "publish_out_details": { + "rate": 0 + }, + "publish_out": 1 + }, + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "fanout", + "vhost": "\/", + "name": "telegraf" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "direct", + "vhost": "\/", + "name": "amq.direct" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "fanout", + "vhost": "\/", + "name": "amq.fanout" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "headers", + "vhost": "\/", + "name": "amq.headers" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "headers", + "vhost": "\/", + "name": "amq.match" + }, + { + "arguments": { }, + "internal": true, + "auto_delete": false, + "durable": true, + "type": "topic", + "vhost": "\/", + "name": "amq.rabbitmq.log" + }, + { + "arguments": { }, + "internal": true, + "auto_delete": false, + "durable": true, + "type": "topic", + "vhost": "\/", + "name": "amq.rabbitmq.trace" + }, + { + "arguments": { }, + "internal": false, + "auto_delete": false, + "durable": true, + "type": "topic", + "vhost": "\/", + "name": "amq.topic" + } +] +` + func TestRabbitMQGeneratesMetrics(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var rsp string @@ -385,6 +481,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { rsp = sampleNodesResponse case "/api/queues": rsp = sampleQueuesResponse + case "/api/exchanges": + rsp = sampleExchangesResponse default: panic("Cannot handle request") } @@ -441,4 +539,13 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { } assert.True(t, acc.HasMeasurement("rabbitmq_queue")) + + exchangeIntMetrics := []string{ + "messages_publish_in", + "messages_publish_out", + } + + for _, metric := range exchangeIntMetrics { + assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric)) + } } From 4c208aed0a13493e4c4aa9dd11318b08992c0ca0 Mon Sep 17 00:00:00 2001 From: kerams Date: Fri, 22 Dec 2017 23:19:59 +0100 Subject: [PATCH 2/3] Fix formatting --- plugins/inputs/rabbitmq/rabbitmq.go | 38 ++++++++++++------------ plugins/inputs/rabbitmq/rabbitmq_test.go | 18 +++++------ 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index 48d26f4a1c331..cd704df956374 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -50,7 +50,7 @@ type RabbitMQ struct { Nodes []string Queues []string - Exchanges []string + Exchanges []string Client *http.Client } @@ -79,8 +79,8 @@ type MessageStats struct { PublishDetails Details `json:"publish_details"` Redeliver int64 RedeliverDetails Details `json:"redeliver_details"` - PublishIn int64 `json:"publish_in"` - PublishOut int64 `json:"publish_out"` + PublishIn int64 `json:"publish_in"` + PublishOut int64 `json:"publish_out"` } // ObjectTotals ... @@ -137,12 +137,12 @@ type Node struct { } type Exchange struct { - Name string - MessageStats `json:"message_stats"` - Internal bool - Vhost string - Durable bool - AutoDelete bool `json:"auto_delete"` + Name string + MessageStats `json:"message_stats"` + Internal bool + Vhost string + Durable bool + AutoDelete bool `json:"auto_delete"` } // gatherFunc ... @@ -407,7 +407,7 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) { "url": r.URL, "exchange": exchange.Name, "vhost": exchange.Vhost, - "internal": strconv.FormatBool(exchange.Internal), + "internal": strconv.FormatBool(exchange.Internal), "durable": strconv.FormatBool(exchange.Durable), "auto_delete": strconv.FormatBool(exchange.AutoDelete), } @@ -452,17 +452,17 @@ func (r *RabbitMQ) shouldGatherQueue(queue Queue) bool { } func (r *RabbitMQ) shouldGatherExchange(exchange Exchange) bool { - if len(r.Exchanges) == 0 { - return true - } + if len(r.Exchanges) == 0 { + return true + } - for _, name := range r.Exchanges { - if name == exchange.Name { - return true - } - } + for _, name := range r.Exchanges { + if name == exchange.Name { + return true + } + } - return false + return false } func init() { diff --git a/plugins/inputs/rabbitmq/rabbitmq_test.go b/plugins/inputs/rabbitmq/rabbitmq_test.go index f315e22f1d75f..759c71c4175b4 100644 --- a/plugins/inputs/rabbitmq/rabbitmq_test.go +++ b/plugins/inputs/rabbitmq/rabbitmq_test.go @@ -481,8 +481,8 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { rsp = sampleNodesResponse case "/api/queues": rsp = sampleQueuesResponse - case "/api/exchanges": - rsp = sampleExchangesResponse + case "/api/exchanges": + rsp = sampleExchangesResponse default: panic("Cannot handle request") } @@ -540,12 +540,12 @@ func TestRabbitMQGeneratesMetrics(t *testing.T) { assert.True(t, acc.HasMeasurement("rabbitmq_queue")) - exchangeIntMetrics := []string{ - "messages_publish_in", - "messages_publish_out", - } + exchangeIntMetrics := []string{ + "messages_publish_in", + "messages_publish_out", + } - for _, metric := range exchangeIntMetrics { - assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric)) - } + for _, metric := range exchangeIntMetrics { + assert.True(t, acc.HasInt64Field("rabbitmq_exchange", metric)) + } } From d7ba01c315c4736a4166fdbbac5843ce98b7b2c2 Mon Sep 17 00:00:00 2001 From: kerams Date: Sat, 23 Dec 2017 09:14:24 +0100 Subject: [PATCH 3/3] Add type tag to rabbitmq_exchange --- plugins/inputs/rabbitmq/README.md | 3 ++- plugins/inputs/rabbitmq/rabbitmq.go | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/inputs/rabbitmq/README.md b/plugins/inputs/rabbitmq/README.md index d5e2eb5f68b9a..a1dfc879a605d 100644 --- a/plugins/inputs/rabbitmq/README.md +++ b/plugins/inputs/rabbitmq/README.md @@ -125,6 +125,7 @@ For additional details reference the [RabbitMQ Management HTTP Stats](https://cd - rabbitmq_exchange - url - exchange + - type - vhost - internal - durable @@ -145,5 +146,5 @@ FROM rabbitmq_overview WHERE time > now() - 10m GROUP BY time(1m) rabbitmq_queue,url=http://amqp.example.org:15672,queue=telegraf,vhost=influxdb,node=rabbit@amqp.example.org,durable=true,auto_delete=false,host=amqp.example.org messages_deliver_get=0i,messages_publish=329i,messages_publish_rate=0.2,messages_redeliver_rate=0,message_bytes_ready=0i,message_bytes_unacked=0i,messages_deliver=329i,messages_unack=0i,consumers=1i,idle_since="",messages=0i,messages_deliver_rate=0.2,messages_deliver_get_rate=0.2,messages_redeliver=0i,memory=43032i,message_bytes_ram=0i,messages_ack=329i,messages_ready=0i,messages_ack_rate=0.2,consumer_utilisation=1,message_bytes=0i,message_bytes_persist=0i 1493684035000000000 rabbitmq_overview,url=http://amqp.example.org:15672,host=amqp.example.org channels=2i,consumers=1i,exchanges=17i,messages_acked=329i,messages=0i,messages_ready=0i,messages_unacked=0i,connections=2i,queues=1i,messages_delivered=329i,messages_published=329i 1493684035000000000 rabbitmq_node,url=http://amqp.example.org:15672,node=rabbit@amqp.example.org,host=amqp.example.org fd_total=1024i,fd_used=32i,mem_limit=8363329126i,sockets_total=829i,disk_free=8175935488i,disk_free_limit=50000000i,mem_used=58771080i,proc_total=1048576i,proc_used=267i,run_queue=0i,sockets_used=2i 149368403500000000 -rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000 +rabbitmq_exchange,url=http://amqp.example.org:15672,exchange=telegraf,type=fanout,vhost=influxdb,internal=false,durable=true,auto_delete=false,host=amqp.example.org messages_publish_in=2i,messages_publish_out=1i 149368403500000000 ``` diff --git a/plugins/inputs/rabbitmq/rabbitmq.go b/plugins/inputs/rabbitmq/rabbitmq.go index cd704df956374..5d8e9028367cb 100644 --- a/plugins/inputs/rabbitmq/rabbitmq.go +++ b/plugins/inputs/rabbitmq/rabbitmq.go @@ -139,6 +139,7 @@ type Node struct { type Exchange struct { Name string MessageStats `json:"message_stats"` + Type string Internal bool Vhost string Durable bool @@ -406,6 +407,7 @@ func gatherExchanges(r *RabbitMQ, acc telegraf.Accumulator) { tags := map[string]string{ "url": r.URL, "exchange": exchange.Name, + "type": exchange.Type, "vhost": exchange.Vhost, "internal": strconv.FormatBool(exchange.Internal), "durable": strconv.FormatBool(exchange.Durable),