Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic RabbitMQ queues metricset #4788

Merged
merged 12 commits into from
Sep 27, 2017
2 changes: 1 addition & 1 deletion metricbeat/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ kibana:

# Collects all module and metricset fields
.PHONY: fields
fields:
fields: python-env
@mkdir -p _meta
@cp ${ES_BEATS}/metricbeat/_meta/fields.common.yml _meta/fields.generated.yml
@${PYTHON_ENV}/bin/python ${ES_BEATS}/metricbeat/scripts/fields_collector.py >> _meta/fields.generated.yml
Expand Down
19 changes: 19 additions & 0 deletions metricbeat/docs/modules/rabbitmq/queues.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
////
This file is generated! See scripts/docs_collector.py
////

[[metricbeat-metricset-rabbitmq-queues]]
include::../../../module/rabbitmq/queues/_meta/docs.asciidoc[]


==== Fields

For a description of each field in the metricset, see the
<<exported-fields-rabbitmq,exported fields>> section.

Here is an example document generated by this metricset:

[source,json]
----
include::../../../module/rabbitmq/queues/_meta/data.json[]
----
1 change: 1 addition & 0 deletions metricbeat/include/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import (
_ "github.com/elastic/beats/metricbeat/module/prometheus/stats"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/node"
_ "github.com/elastic/beats/metricbeat/module/rabbitmq/queues"
_ "github.com/elastic/beats/metricbeat/module/redis"
_ "github.com/elastic/beats/metricbeat/module/redis/info"
_ "github.com/elastic/beats/metricbeat/module/redis/keyspace"
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ metricbeat.modules:

#------------------------------ RabbitMQ Module ------------------------------
- module: rabbitmq
metricsets: ["node"]
metricsets: ["node", "queues"]
period: 10s
hosts: ["localhost:15672"]

Expand Down
2 changes: 1 addition & 1 deletion metricbeat/module/rabbitmq/_meta/config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
- module: rabbitmq
metricsets: ["node"]
metricsets: ["node", "queues"]
period: 10s
hosts: ["localhost:15672"]

Expand Down
145 changes: 145 additions & 0 deletions metricbeat/module/rabbitmq/_meta/testdata/queues_sample_response.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
[
{
"memory": 232720,
"message_stats": {
"disk_reads": 212,
"disk_reads_details": {
"rate": 0
},
"disk_writes": 121,
"disk_writes_details": {
"rate": 0
},
"deliver": 15,
"deliver_details": {
"rate": 0
},
"deliver_no_ack": 0,
"deliver_no_ack_details": {
"rate": 0
},
"get": 0,
"get_details": {
"rate": 0
},
"get_no_ack": 38,
"get_no_ack_details": {
"rate": 0
},
"publish": 121,
"publish_details": {
"rate": 0
},
"publish_in": 0,
"publish_in_details": {
"rate": 0
},
"publish_out": 0,
"publish_out_details": {
"rate": 0
},
"ack": 9,
"ack_details": {
"rate": 0
},
"deliver_get": 53,
"deliver_get_details": {
"rate": 0
},
"confirm": 0,
"confirm_details": {
"rate": 0
},
"return_unroutable": 0,
"return_unroutable_details": {
"rate": 0
},
"redeliver": 3,
"redeliver_details": {
"rate": 0
}
},
"reductions": 787128,
"reductions_details": {
"rate": 0
},
"messages": 74,
"messages_details": {
"rate": 0
},
"messages_ready": 71,
"messages_ready_details": {
"rate": 0
},
"messages_unacknowledged": 3,
"messages_unacknowledged_details": {
"rate": 0
},
"idle_since": "2017-07-28 23:45:52",
"consumer_utilisation": 0.7,
"policy": null,
"exclusive_consumer_tag": null,
"consumers": 3,
"recoverable_slaves": null,
"state": "running",
"garbage_collection": {
"min_bin_vheap_size": 46422,
"min_heap_size": 233,
"fullsweep_after": 65535,
"minor_gcs": 0
},
"messages_ram": 74,
"messages_ready_ram": 71,
"messages_unacknowledged_ram": 3,
"messages_persistent": 73,
"message_bytes": 101824,
"message_bytes_ready": 97696,
"message_bytes_unacknowledged": 4128,
"message_bytes_ram": 101824,
"message_bytes_persistent": 101824,
"head_message_timestamp": 1501250275,
"disk_reads": 212,
"disk_writes": 121,
"backing_queue_status": {
"priority_lengths": {
"0": 0,
"1": 71,
"2": 0,
"3": 0,
"4": 0,
"5": 0,
"6": 0,
"7": 0,
"8": 0,
"9": 0
},
"mode": "default",
"q1": 0,
"q2": 0,
"delta": [
"delta",
"todo",
"todo",
"todo"
],
"q3": 0,
"q4": 71,
"len": 71,
"target_ram_count": "infinity",
"next_seq_id": 121,
"avg_ingress_rate": 0,
"avg_egress_rate": 0.00019793395296866087,
"avg_ack_ingress_rate": 0.00019793395296866087,
"avg_ack_egress_rate": 0.00019793395296866087
},
"node": "rabbit@localhost",
"arguments": {
"x-max-priority": 9
},
"exclusive": false,
"auto_delete": false,
"durable": true,
"vhost": "/",
"name": "queuenamehere"
}
]
19 changes: 19 additions & 0 deletions metricbeat/module/rabbitmq/queues/_meta/data.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"@timestamp":"2016-05-23T08:05:34.853Z",
"beat":{
"hostname":"beathost",
"name":"beathost"
},
"metricset":{
"host":"localhost",
"module":"rabbitmq",
"name":"queues",
"rtt":44269
},
"rabbitmq":{
"queues":{
"example": "queues"
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like some fields are missing from this example data.json, can you generate a complete one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

},
"type":"metricsets"
}
3 changes: 3 additions & 0 deletions metricbeat/module/rabbitmq/queues/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
=== rabbitmq queues MetricSet

This is the queues metricset of the module rabbitmq.
73 changes: 73 additions & 0 deletions metricbeat/module/rabbitmq/queues/_meta/fields.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
- name: queues
type: group
description: >
queues
fields:
- name: name
type: keyword
description: >
The name of the queue with non-ASCII characters escaped as in C.
- name: vhost
type: keyword
description: >
Virtual host name with non-ASCII characters escaped as in C.
- name: durable
type: boolean
description: >
Whether or not the queue survives server restarts.
- name: auto_delete
type: boolean
description: >
Whether the queue will be deleted automatically when no longer used.
- name: exclusive
type: boolean
description: >
Whether the queue is exclusive (i.e. has owner_pid).
- name: node
type: keyword
description: >
Node name.
- name: state
type: keyword
description: >
The state of the queue. Normally 'running', but may be "{syncing, MsgCount}" if the queue is synchronising. Queues which are located on cluster nodes that are currently down will be shown with a status of 'down'.
- name: arguments.max_priority
type: long
description: >
Maximum number of priority levels for the queue to support.
- name: consumers.count
type: long
description: >
Number of consumers.
- name: consumers.utilisation.pct
type: long
description: >
Fraction of the time (between 0.0 and 1.0) that the queue is able to immediately deliver messages to consumers. This can be less than 1.0 if consumers are limited by network congestion or prefetch count.
- name: messages.total.count
type: long
description: >
Sum of ready and unacknowledged messages (queue depth).
- name: messages.ready.count
type: long
description: >
Number of messages ready to be delivered to clients.
- name: messages.unacknowledged.count
type: long
description: >
Number of messages delivered to clients but not yet acknowledged.
- name: messages.persistent.count
type: long
description: >
Total number of persistent messages in the queue (will always be 0 for transient queues).
- name: memory.bytes
type: long
description: >
Bytes of memory consumed by the Erlang process associated with the queue, including stack, heap and internal structures.
- name: disk.reads.count
type: long
description: >
Total number of times messages have been read from disk by this queue since it started.
- name: disk.writes.count
type: long
description: >
Total number of times messages have been written to disk by this queue since it started.
79 changes: 79 additions & 0 deletions metricbeat/module/rabbitmq/queues/data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package queues

import (
"encoding/json"

"github.com/elastic/beats/libbeat/common"
s "github.com/elastic/beats/libbeat/common/schema"
c "github.com/elastic/beats/libbeat/common/schema/mapstriface"
"github.com/elastic/beats/libbeat/logp"
)

var (
schema = s.Schema{
"name": c.Str("name"),
"vhost": c.Str("vhost"),
"durable": c.Bool("durable"),
"auto_delete": c.Bool("auto_delete"),
"exclusive": c.Bool("exclusive"),
"node": c.Str("node"),
"state": c.Str("state"),
"arguments": c.Dict("arguments", s.Schema{
"max_priority": c.Int("x-max-priority", s.Optional),
}),
"consumers": s.Object{
"count": c.Int("consumers"),
"utilisation": s.Object{
"pct": c.Int("consumer_utilisation", s.Optional),
},
},
"messages": s.Object{
"total": s.Object{
"count": c.Int("messages"),
},
"ready": s.Object{
"count": c.Int("messages_ready"),
},
"unacknowledged": s.Object{
"count": c.Int("messages_unacknowledged"),
},
"persistent": s.Object{
"count": c.Int("messages_persistent"),
},
},
"memory": s.Object{
"bytes": c.Int("memory"),
},
"disk": s.Object{
"reads": s.Object{
"count": c.Int("disk_reads"),
},
"writes": s.Object{
"count": c.Int("disk_writes"),
},
},
}
)

func eventsMapping(content []byte) ([]common.MapStr, error) {
var queues []map[string]interface{}
err := json.Unmarshal(content, &queues)
if err != nil {
logp.Err("Error: ", err)
}

events := []common.MapStr{}
errors := s.NewErrors()

for _, queue := range queues {
event, errs := eventMapping(queue)
events = append(events, event)
errors.AddErrors(errs)
}

return events, errors
}

func eventMapping(queue map[string]interface{}) (common.MapStr, *s.Errors) {
return schema.Apply(queue)
}
Loading