-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ActiveMQ input plugin #2689
Changes from 6 commits
a57479b
2b5c2e3
454d7ad
189c429
6a0acbb
c04543e
f410178
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
# Telegraf Input Plugin: ActiveMQ | ||
|
||
This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
# Description | ||
[[inputs.activemq]] | ||
## Required ActiveMQ Endpoint | ||
server = "192.168.50.10" | ||
## Required ActiveMQ port | ||
port = 8161 | ||
## Required username used for request HTTP Basic Authentication | ||
username = "admin" | ||
## Required password used for HTTP Basic Authentication | ||
password = "admin" | ||
## Required ActiveMQ webadmin root path | ||
webadmin = "admin" | ||
|
||
## Maximum time to receive response. | ||
# response_timeout = "5s" | ||
|
||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
# insecure_skip_verify = false | ||
``` | ||
|
||
### Measurements & Fields: | ||
|
||
Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API. | ||
|
||
- activemq_queues: | ||
- size | ||
- consumer_count | ||
- enqueue_count | ||
- dequeue_count | ||
- activemq_topics: | ||
- size | ||
- consumer_count | ||
- enqueue_count | ||
- dequeue_count | ||
- subscribers_metrics: | ||
- pending_queue_size | ||
- dispatched_queue_size | ||
- dispatched_counter | ||
- enqueue_counter | ||
- dequeue_counter | ||
|
||
### Tags: | ||
|
||
- activemq_queues: | ||
- name | ||
- source | ||
- port | ||
- activemq_topics: | ||
- name | ||
- source | ||
- port | ||
- activemq_subscribers: | ||
- client_id | ||
- subscription_name | ||
- connection_id | ||
- destination_name | ||
- selector | ||
- active | ||
- source | ||
- port | ||
|
||
### Example Output: | ||
|
||
``` | ||
$ ./telegraf -config telegraf.conf -input-filter activemq -test | ||
activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 | ||
activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,host=88284b2fe51b,name=AAA\,source=localhost,port=8161 size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.Topic\,source=localhost,port=8161 ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.Queue\,source=localhost,port=8161 ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,name=AAAA\ ,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 | ||
activemq_subscribers,connection_id=NOTSET,destination_name=AAA,,source=localhost,port=8161,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000 | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,257 @@ | ||
package activemq | ||
|
||
import ( | ||
"encoding/xml" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"strconv" | ||
"time" | ||
|
||
"strings" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/internal/tls" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
) | ||
|
||
type ActiveMQ struct { | ||
Server string `json:"server"` | ||
Port int `json:"port"` | ||
Username string `json:"username"` | ||
Password string `json:"password"` | ||
Webadmin string `json:"webadmin"` | ||
ResponseTimeout internal.Duration | ||
tls.ClientConfig | ||
|
||
client *http.Client | ||
} | ||
|
||
type Topics struct { | ||
XMLName xml.Name `xml:"topics"` | ||
TopicItems []Topic `xml:"topic"` | ||
} | ||
|
||
type Topic struct { | ||
XMLName xml.Name `xml:"topic"` | ||
Name string `xml:"name,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Subscribers struct { | ||
XMLName xml.Name `xml:"subscribers"` | ||
SubscriberItems []Subscriber `xml:"subscriber"` | ||
} | ||
|
||
type Subscriber struct { | ||
XMLName xml.Name `xml:"subscriber"` | ||
ClientId string `xml:"clientId,attr"` | ||
SubscriptionName string `xml:"subscriptionName,attr"` | ||
ConnectionId string `xml:"connectionId,attr"` | ||
DestinationName string `xml:"destinationName,attr"` | ||
Selector string `xml:"selector,attr"` | ||
Active string `xml:"active,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Queues struct { | ||
XMLName xml.Name `xml:"queues"` | ||
QueueItems []Queue `xml:"queue"` | ||
} | ||
|
||
type Queue struct { | ||
XMLName xml.Name `xml:"queue"` | ||
Name string `xml:"name,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Stats struct { | ||
XMLName xml.Name `xml:"stats"` | ||
Size int `xml:"size,attr"` | ||
ConsumerCount int `xml:"consumerCount,attr"` | ||
EnqueueCount int `xml:"enqueueCount,attr"` | ||
DequeueCount int `xml:"dequeueCount,attr"` | ||
PendingQueueSize int `xml:"pendingQueueSize,attr"` | ||
DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"` | ||
DispatchedCounter int `xml:"dispatchedCounter,attr"` | ||
EnqueueCounter int `xml:"enqueueCounter,attr"` | ||
DequeueCounter int `xml:"dequeueCounter,attr"` | ||
} | ||
|
||
const ( | ||
QUEUES_STATS = "queues" | ||
TOPICS_STATS = "topics" | ||
SUBSCRIBERS_STATS = "subscribers" | ||
) | ||
|
||
var sampleConfig = ` | ||
## Required ActiveMQ Endpoint | ||
# server = "192.168.50.10" | ||
## Required ActiveMQ port | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nitpick/suggestion: add newlines between config options There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
# port = 8161 | ||
## Required username used for request HTTP Basic Authentication | ||
# username = "admin" | ||
## Required password used for HTTP Basic Authentication | ||
# password = "admin" | ||
## Required ActiveMQ webadmin root path | ||
# webadmin = "admin" | ||
## Maximum time to receive response. | ||
# response_timeout = "5s" | ||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
` | ||
|
||
func (a *ActiveMQ) Description() string { | ||
return "Gather ActiveMQ metrics" | ||
} | ||
|
||
func (a *ActiveMQ) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (a *ActiveMQ) createHttpClient() (*http.Client, error) { | ||
tlsCfg, err := a.ClientConfig.TLSConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
client := &http.Client{ | ||
Transport: &http.Transport{ | ||
TLSClientConfig: tlsCfg, | ||
}, | ||
Timeout: a.ResponseTimeout.Duration, | ||
} | ||
|
||
return client, nil | ||
} | ||
|
||
func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { | ||
if a.ResponseTimeout.Duration < time.Second { | ||
a.ResponseTimeout.Duration = time.Second * 5 | ||
} | ||
|
||
if a.client == nil { | ||
client, err := a.createHttpClient() | ||
if err != nil { | ||
return nil, err | ||
} | ||
a.client = client | ||
} | ||
url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword) | ||
|
||
req, err := http.NewRequest("GET", url, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
req.SetBasicAuth(a.Username, a.Password) | ||
resp, err := a.client.Do(req) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
defer resp.Body.Close() | ||
return ioutil.ReadAll(resp.Body) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you reduce the amount of vertical whitespace in this function? |
||
|
||
func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) { | ||
for _, queue := range queues.QueueItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["name"] = strings.TrimSpace(queue.Name) | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["size"] = queue.Stats.Size | ||
records["consumer_count"] = queue.Stats.ConsumerCount | ||
records["enqueue_count"] = queue.Stats.EnqueueCount | ||
records["dequeue_count"] = queue.Stats.DequeueCount | ||
|
||
acc.AddFields("activemq_queues", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) { | ||
for _, topic := range topics.TopicItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["name"] = topic.Name | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["size"] = topic.Stats.Size | ||
records["consumer_count"] = topic.Stats.ConsumerCount | ||
records["enqueue_count"] = topic.Stats.EnqueueCount | ||
records["dequeue_count"] = topic.Stats.DequeueCount | ||
|
||
acc.AddFields("activemq_topics", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) { | ||
for _, subscriber := range subscribers.SubscriberItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["client_id"] = subscriber.ClientId | ||
tags["subscription_name"] = subscriber.SubscriptionName | ||
tags["connection_id"] = subscriber.ConnectionId | ||
tags["destination_name"] = subscriber.DestinationName | ||
tags["selector"] = subscriber.Selector | ||
tags["active"] = subscriber.Active | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["pending_queue_size"] = subscriber.Stats.PendingQueueSize | ||
records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize | ||
records["dispatched_counter"] = subscriber.Stats.DispatchedCounter | ||
records["enqueue_counter"] = subscriber.Stats.EnqueueCounter | ||
records["dequeue_counter"] = subscriber.Stats.DequeueCounter | ||
|
||
acc.AddFields("activemq_subscribers", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { | ||
dataQueues, err := a.GetMetrics(QUEUES_STATS) | ||
queues := Queues{} | ||
err = xml.Unmarshal(dataQueues, &queues) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
dataTopics, err := a.GetMetrics(TOPICS_STATS) | ||
topics := Topics{} | ||
err = xml.Unmarshal(dataTopics, &topics) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS) | ||
subscribers := Subscribers{} | ||
err = xml.Unmarshal(dataSubscribers, &subscribers) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
a.GatherQueuesMetrics(acc, queues) | ||
a.GatherTopicsMetrics(acc, topics) | ||
a.GatherSubscribersMetrics(acc, subscribers) | ||
|
||
return nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you reduce the amount of vertical whitespace in this function? |
||
|
||
func init() { | ||
inputs.Add("activemq", func() telegraf.Input { | ||
return &ActiveMQ{ | ||
Server: "localhost", | ||
Port: 8161, | ||
} | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will also need a tag for the server for differentiation when running the plugin multiple times. Following the discussion on #4413, let's add
source
andport
tags to all three measurements: