-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ActiveMQ input plugin #2689
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
a57479b
activemq input plugin
mlabouardy 2b5c2e3
activemq test & README
mlabouardy 454d7ad
fix for review https://github.com/influxdata/telegraf/pull/2689
189c429
add client field to activemq struct
mlabouardy 6a0acbb
fix package name
mlabouardy c04543e
fix unit test
mlabouardy f410178
add line between config options
mlabouardy File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# Telegraf Input Plugin: ActiveMQ | ||
|
||
This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
# Description | ||
[[inputs.activemq]] | ||
## Required ActiveMQ Endpoint | ||
# server = "192.168.50.10" | ||
|
||
## Required ActiveMQ port | ||
# port = 8161 | ||
|
||
## Credentials for basic HTTP authentication | ||
# username = "admin" | ||
# password = "admin" | ||
|
||
## Required ActiveMQ webadmin root path | ||
# webadmin = "admin" | ||
|
||
## Maximum time to receive response. | ||
# response_timeout = "5s" | ||
|
||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
``` | ||
|
||
### Measurements & Fields: | ||
|
||
Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API. | ||
|
||
- activemq_queues: | ||
- size | ||
- consumer_count | ||
- enqueue_count | ||
- dequeue_count | ||
- activemq_topics: | ||
- size | ||
- consumer_count | ||
- enqueue_count | ||
- dequeue_count | ||
- subscribers_metrics: | ||
- pending_queue_size | ||
- dispatched_queue_size | ||
- dispatched_counter | ||
- enqueue_counter | ||
- dequeue_counter | ||
|
||
### Tags: | ||
|
||
- activemq_queues: | ||
- name | ||
- source | ||
- port | ||
- activemq_topics: | ||
- name | ||
- source | ||
- port | ||
- activemq_subscribers: | ||
- client_id | ||
- subscription_name | ||
- connection_id | ||
- destination_name | ||
- selector | ||
- active | ||
- source | ||
- port | ||
|
||
### Example Output: | ||
|
||
``` | ||
$ ./telegraf -config telegraf.conf -input-filter activemq -test | ||
activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 | ||
activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,host=88284b2fe51b,name=AAA\,source=localhost,port=8161 size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.Topic\,source=localhost,port=8161 ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000 | ||
activemq_topics,name=ActiveMQ.Advisory.Queue\,source=localhost,port=8161 ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000 | ||
activemq_topics,name=AAAA\ ,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 | ||
activemq_subscribers,connection_id=NOTSET,destination_name=AAA,,source=localhost,port=8161,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,261 @@ | ||
package activemq | ||
|
||
import ( | ||
"encoding/xml" | ||
"fmt" | ||
"io/ioutil" | ||
"net/http" | ||
"strconv" | ||
"time" | ||
|
||
"strings" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/internal/tls" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
) | ||
|
||
type ActiveMQ struct { | ||
Server string `json:"server"` | ||
Port int `json:"port"` | ||
Username string `json:"username"` | ||
Password string `json:"password"` | ||
Webadmin string `json:"webadmin"` | ||
ResponseTimeout internal.Duration | ||
tls.ClientConfig | ||
|
||
client *http.Client | ||
} | ||
|
||
type Topics struct { | ||
XMLName xml.Name `xml:"topics"` | ||
TopicItems []Topic `xml:"topic"` | ||
} | ||
|
||
type Topic struct { | ||
XMLName xml.Name `xml:"topic"` | ||
Name string `xml:"name,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Subscribers struct { | ||
XMLName xml.Name `xml:"subscribers"` | ||
SubscriberItems []Subscriber `xml:"subscriber"` | ||
} | ||
|
||
type Subscriber struct { | ||
XMLName xml.Name `xml:"subscriber"` | ||
ClientId string `xml:"clientId,attr"` | ||
SubscriptionName string `xml:"subscriptionName,attr"` | ||
ConnectionId string `xml:"connectionId,attr"` | ||
DestinationName string `xml:"destinationName,attr"` | ||
Selector string `xml:"selector,attr"` | ||
Active string `xml:"active,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Queues struct { | ||
XMLName xml.Name `xml:"queues"` | ||
QueueItems []Queue `xml:"queue"` | ||
} | ||
|
||
type Queue struct { | ||
XMLName xml.Name `xml:"queue"` | ||
Name string `xml:"name,attr"` | ||
Stats Stats `xml:"stats"` | ||
} | ||
|
||
type Stats struct { | ||
XMLName xml.Name `xml:"stats"` | ||
Size int `xml:"size,attr"` | ||
ConsumerCount int `xml:"consumerCount,attr"` | ||
EnqueueCount int `xml:"enqueueCount,attr"` | ||
DequeueCount int `xml:"dequeueCount,attr"` | ||
PendingQueueSize int `xml:"pendingQueueSize,attr"` | ||
DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"` | ||
DispatchedCounter int `xml:"dispatchedCounter,attr"` | ||
EnqueueCounter int `xml:"enqueueCounter,attr"` | ||
DequeueCounter int `xml:"dequeueCounter,attr"` | ||
} | ||
|
||
const ( | ||
QUEUES_STATS = "queues" | ||
TOPICS_STATS = "topics" | ||
SUBSCRIBERS_STATS = "subscribers" | ||
) | ||
|
||
var sampleConfig = ` | ||
## Required ActiveMQ Endpoint | ||
# server = "192.168.50.10" | ||
|
||
## Required ActiveMQ port | ||
# port = 8161 | ||
|
||
## Credentials for basic HTTP authentication | ||
# username = "admin" | ||
# password = "admin" | ||
|
||
## Required ActiveMQ webadmin root path | ||
# webadmin = "admin" | ||
|
||
## Maximum time to receive response. | ||
# response_timeout = "5s" | ||
|
||
## Optional TLS Config | ||
# tls_ca = "/etc/telegraf/ca.pem" | ||
# tls_cert = "/etc/telegraf/cert.pem" | ||
# tls_key = "/etc/telegraf/key.pem" | ||
## Use TLS but skip chain & host verification | ||
` | ||
|
||
func (a *ActiveMQ) Description() string { | ||
return "Gather ActiveMQ metrics" | ||
} | ||
|
||
func (a *ActiveMQ) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (a *ActiveMQ) createHttpClient() (*http.Client, error) { | ||
tlsCfg, err := a.ClientConfig.TLSConfig() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
client := &http.Client{ | ||
Transport: &http.Transport{ | ||
TLSClientConfig: tlsCfg, | ||
}, | ||
Timeout: a.ResponseTimeout.Duration, | ||
} | ||
|
||
return client, nil | ||
} | ||
|
||
func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { | ||
if a.ResponseTimeout.Duration < time.Second { | ||
a.ResponseTimeout.Duration = time.Second * 5 | ||
} | ||
|
||
if a.client == nil { | ||
client, err := a.createHttpClient() | ||
if err != nil { | ||
return nil, err | ||
} | ||
a.client = client | ||
} | ||
url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword) | ||
|
||
req, err := http.NewRequest("GET", url, nil) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
req.SetBasicAuth(a.Username, a.Password) | ||
resp, err := a.client.Do(req) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
defer resp.Body.Close() | ||
return ioutil.ReadAll(resp.Body) | ||
} | ||
|
||
func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) { | ||
for _, queue := range queues.QueueItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["name"] = strings.TrimSpace(queue.Name) | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["size"] = queue.Stats.Size | ||
records["consumer_count"] = queue.Stats.ConsumerCount | ||
records["enqueue_count"] = queue.Stats.EnqueueCount | ||
records["dequeue_count"] = queue.Stats.DequeueCount | ||
|
||
acc.AddFields("activemq_queues", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) { | ||
for _, topic := range topics.TopicItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["name"] = topic.Name | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["size"] = topic.Stats.Size | ||
records["consumer_count"] = topic.Stats.ConsumerCount | ||
records["enqueue_count"] = topic.Stats.EnqueueCount | ||
records["dequeue_count"] = topic.Stats.DequeueCount | ||
|
||
acc.AddFields("activemq_topics", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) { | ||
for _, subscriber := range subscribers.SubscriberItems { | ||
records := make(map[string]interface{}) | ||
tags := make(map[string]string) | ||
|
||
tags["client_id"] = subscriber.ClientId | ||
tags["subscription_name"] = subscriber.SubscriptionName | ||
tags["connection_id"] = subscriber.ConnectionId | ||
tags["destination_name"] = subscriber.DestinationName | ||
tags["selector"] = subscriber.Selector | ||
tags["active"] = subscriber.Active | ||
tags["source"] = a.Server | ||
tags["port"] = strconv.Itoa(a.Port) | ||
|
||
records["pending_queue_size"] = subscriber.Stats.PendingQueueSize | ||
records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize | ||
records["dispatched_counter"] = subscriber.Stats.DispatchedCounter | ||
records["enqueue_counter"] = subscriber.Stats.EnqueueCounter | ||
records["dequeue_counter"] = subscriber.Stats.DequeueCounter | ||
|
||
acc.AddFields("activemq_subscribers", records, tags) | ||
} | ||
} | ||
|
||
func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { | ||
dataQueues, err := a.GetMetrics(QUEUES_STATS) | ||
queues := Queues{} | ||
err = xml.Unmarshal(dataQueues, &queues) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
dataTopics, err := a.GetMetrics(TOPICS_STATS) | ||
topics := Topics{} | ||
err = xml.Unmarshal(dataTopics, &topics) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS) | ||
subscribers := Subscribers{} | ||
err = xml.Unmarshal(dataSubscribers, &subscribers) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
a.GatherQueuesMetrics(acc, queues) | ||
a.GatherTopicsMetrics(acc, topics) | ||
a.GatherSubscribersMetrics(acc, subscribers) | ||
|
||
return nil | ||
} | ||
|
||
func init() { | ||
inputs.Add("activemq", func() telegraf.Input { | ||
return &ActiveMQ{ | ||
Server: "localhost", | ||
Port: 8161, | ||
} | ||
}) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will also need a tag for the server for differentiation when running the plugin multiple times. Following the discussion on #4413, let's add
source
andport
tags to all three measurements: