From a57479b4c726680777cc57ebc4dc9e830e90366e Mon Sep 17 00:00:00 2001 From: mlabouardy Date: Wed, 19 Apr 2017 15:54:37 +0200 Subject: [PATCH 1/7] activemq input plugin --- README.md | 1 + plugins/inputs/activemq/README.md | 0 plugins/inputs/activemq/activemq.go | 219 ++++++++++++++++++++++++++++ plugins/inputs/all/all.go | 1 + 4 files changed, 221 insertions(+) create mode 100644 plugins/inputs/activemq/README.md create mode 100644 plugins/inputs/activemq/activemq.go diff --git a/README.md b/README.md index 6307b5356f191..700f0dd2a49d2 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,7 @@ configuration options. ## Input Plugins +* [activemq](./plugins/inputs/activemq) * [aerospike](./plugins/inputs/aerospike) * [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq) * [apache](./plugins/inputs/apache) diff --git a/plugins/inputs/activemq/README.md b/plugins/inputs/activemq/README.md new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go new file mode 100644 index 0000000000000..41de0eeb9eb1a --- /dev/null +++ b/plugins/inputs/activemq/activemq.go @@ -0,0 +1,219 @@ +package main + +import ( + "encoding/xml" + "fmt" + "io/ioutil" + "net/http" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type ActiveMQ struct { + Server string `json:"server"` + Port int `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Webadmin string `json:"webadmin"` +} + +type Topics struct { + XMLName xml.Name `xml:"topics"` + TopicItems []Topic `xml:"topic"` +} + +type Topic struct { + XMLName xml.Name `xml:"topic"` + Name string `xml:"name,attr"` + Stats Stats `xml:"stats"` +} + +type Subscribers struct { + XMLName xml.Name `xml:"subscribers"` + SubscriberItems []Subscriber `xml:"subscriber"` +} + +type Subscriber struct { + XMLName xml.Name `xml:"subscriber"` + ClientId string `xml:"clientId,attr"` + SubscriptionName string `xml:"subscriptionName,attr"` + ConnectionId string `xml:"connectionId,attr"` + DestinationName string `xml:"destinationName,attr"` + Selector string `xml:"selector,attr"` + Active string `xml:"active,attr"` + Stats Stats `xml:"stats"` +} + +type Queues struct { + XMLName xml.Name `xml:"queues"` + QueueItems []Queue `xml:"queue"` +} + +type Queue struct { + XMLName xml.Name `xml:"queue"` + Name string `xml:"name,attr"` + Stats Stats `xml:"stats"` +} + +type Stats struct { + XMLName xml.Name `xml:"stats"` + Size int `xml:"size,attr"` + ConsumerCount int `xml:"consumerCount,attr"` + EnqueueCount int `xml:"enqueueCount,attr"` + DequeueCount int `xml:"dequeueCount,attr"` + PendingQueueSize int `xml:"pendingQueueSize,attr"` + DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"` + DispatchedCounter int `xml:"dispatchedCounter,attr"` + EnqueueCounter int `xml:"enqueueCounter,attr"` + DequeueCounter int `xml:"dequeueCounter,attr"` +} + +const ( + QUEUES_STATS = "queues" + TOPICS_STATS = "topics" + SUBSCRIBERS_STATS = "subscribers" +) + +var sampleConfig = ` + ## Required ActiveMQ Endpoint + # server = "192.168.50.10" + ## Required ActiveMQ port + # port = 8161 + ## Required username used for request HTTP Basic Authentication + # username = "admin" + ## Required password used for HTTP Basic Authentication + # password = "admin" + ## Required ActiveMQ webadmin root path + # webadmin = "admin" + ` + +func (a *ActiveMQ) Description() string { + return "Gather ActiveMQ metrics" +} + +func (a *ActiveMQ) SampleConfig() string { + return sampleConfig +} + +func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { + client := &http.Client{} + + url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword) + + req, err := http.NewRequest("GET", url, nil) + + if err != nil { + return nil, err + } + + req.SetBasicAuth(a.Username, a.Password) + + resp, err := client.Do(req) + + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + return ioutil.ReadAll(resp.Body) +} + +func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) { + for _, queue := range queues.QueueItems { + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["name"] = queue.Name + + records["size"] = queue.Stats.Size + records["consumer_count"] = queue.Stats.ConsumerCount + records["enqueue_count"] = queue.Stats.EnqueueCount + records["dequeue_count"] = queue.Stats.DequeueCount + + acc.AddFields("queues_metrics", records, tags) + } +} + +func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) { + for _, topic := range topics.TopicItems { + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["name"] = topic.Name + + records["size"] = topic.Stats.Size + records["consumer_count"] = topic.Stats.ConsumerCount + records["enqueue_count"] = topic.Stats.EnqueueCount + records["dequeue_count"] = topic.Stats.DequeueCount + + acc.AddFields("topics_metrics", records, tags) + } +} + +func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) { + for _, subscriber := range subscribers.SubscriberItems { + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["client_id"] = subscriber.ClientId + tags["subscription_name"] = subscriber.SubscriptionName + tags["connection_id"] = subscriber.ConnectionId + tags["destination_name"] = subscriber.DestinationName + tags["selector"] = subscriber.Selector + tags["active"] = subscriber.Active + + records["pending_queue_size"] = subscriber.Stats.PendingQueueSize + records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize + records["dispatched_counter"] = subscriber.Stats.DispatchedCounter + records["enqueue_counter"] = subscriber.Stats.EnqueueCounter + records["dequeue_counter"] = subscriber.Stats.DequeueCounter + + acc.AddFields("subscribers_metrics", records, tags) + } +} + +func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { + dataQueues, err := a.GetMetrics(QUEUES_STATS) + + queues := Queues{} + + err = xml.Unmarshal(dataQueues, &queues) + + if err != nil { + return err + } + + dataTopics, err := a.GetMetrics(TOPICS_STATS) + + topics := Topics{} + + err = xml.Unmarshal(dataTopics, &topics) + + if err != nil { + return err + } + + dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS) + + subscribers := Subscribers{} + + err = xml.Unmarshal(dataSubscribers, &subscribers) + + if err != nil { + return err + } + + a.GatherQueuesMetrics(acc, queues) + + a.GatherTopicsMetrics(acc, topics) + + a.GatherSubscribersMetrics(acc, subscribers) + + return nil +} + +func init() { + inputs.Add("activemq", func() telegraf.Input { return &ActiveMQ{} }) +} diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index bbca99521817b..ac86fb87985e2 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -1,6 +1,7 @@ package all import ( + _ "github.com/influxdata/telegraf/plugins/inputs/activemq" _ "github.com/influxdata/telegraf/plugins/inputs/aerospike" _ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer" _ "github.com/influxdata/telegraf/plugins/inputs/apache" From 2b5c2e3191093466f4c88f23ee056aa937f55165 Mon Sep 17 00:00:00 2001 From: mlabouardy Date: Wed, 19 Apr 2017 16:16:40 +0200 Subject: [PATCH 2/7] activemq test & README --- plugins/inputs/activemq/README.md | 69 ++++++++++++ plugins/inputs/activemq/activemq.go | 2 +- plugins/inputs/activemq/activemq_test.go | 127 +++++++++++++++++++++++ 3 files changed, 197 insertions(+), 1 deletion(-) create mode 100644 plugins/inputs/activemq/activemq_test.go diff --git a/plugins/inputs/activemq/README.md b/plugins/inputs/activemq/README.md index e69de29bb2d1d..aa6198ed0739b 100644 --- a/plugins/inputs/activemq/README.md +++ b/plugins/inputs/activemq/README.md @@ -0,0 +1,69 @@ +# Telegraf Input Plugin: ActiveMQ + +This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API. + +### Configuration: + +```toml +# Description +[[inputs.activemq]] + ## Required ActiveMQ Endpoint + server = "192.168.50.10" + ## Required ActiveMQ port + port = 8161 + ## Required username used for request HTTP Basic Authentication + username = "admin" + ## Required password used for HTTP Basic Authentication + password = "admin" + ## Required ActiveMQ webadmin root path + webadmin = "admin" +``` + +### Measurements & Fields: + +Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API. + +- queues_metrics: + - size + - consumer_count + - enqueue_count + - dequeue_count + - topics_metrics: + - size + - consumer_count + - enqueue_count + - dequeue_count + - subscribers_metrics: + - pending_queue_size + - dispatched_queue_size + - dispatched_counter + - enqueue_counter + - dequeue_counter + +### Tags: + +- queues_metrics: + - name +- topics_metrics: + - name +- subscribers_metrics: + - client_id + - subscription_name + - connection_id + - destination_name + - selector + - active + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter activemq -test +queues_metrics,name=sandra,host=88284b2fe51b consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 +queues_metrics,name=Test,host=88284b2fe51b dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000 +topics_metrics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000 +topics_metrics,host=88284b2fe51b,name=AAA\ size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000 +topics_metrics,name=ActiveMQ.Advisory.Topic\ ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000 +topics_metrics,name=ActiveMQ.Advisory.Queue\ ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000 +topics_metrics,name=AAAA\ ,host=88284b2fe51b consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 +subscribers_metrics,connection_id=NOTSET,destination_name=AAA,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000 +``` diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index 41de0eeb9eb1a..f7433329f4396 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -1,4 +1,4 @@ -package main +package activemq import ( "encoding/xml" diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go new file mode 100644 index 0000000000000..43552f56776bd --- /dev/null +++ b/plugins/inputs/activemq/activemq_test.go @@ -0,0 +1,127 @@ +package activemq + +import ( + "encoding/xml" + "testing" + + "github.com/influxdata/telegraf/testutil" +) + +func TestGatherQueuesMetrics(t *testing.T) { + + s := ` + + + +queueBrowse/sandra?view=rss&feedType=atom_1.0 +queueBrowse/sandra?view=rss&feedType=rss_2.0 + + + + + +queueBrowse/Test?view=rss&feedType=atom_1.0 +queueBrowse/Test?view=rss&feedType=rss_2.0 + + +` + + queues := Queues{} + + xml.Unmarshal([]byte(s), &queues) + + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["name"] = "Test" + + records["size"] = 0 + records["consumer_count"] = 0 + records["enqueue_count"] = 0 + records["dequeue_count"] = 0 + + var acc testutil.Accumulator + + activeMQ := new(ActiveMQ) + + activeMQ.GatherQueuesMetrics(&acc, queues) + acc.AssertContainsTaggedFields(t, "queues_metrics", records, tags) +} + +func TestGatherTopicsMetrics(t *testing.T) { + + s := ` + + + + + + + + + + + + + + + +` + + topics := Topics{} + + xml.Unmarshal([]byte(s), &topics) + + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["name"] = "ActiveMQ.Advisory.MasterBroker " + + records["size"] = 0 + records["consumer_count"] = 0 + records["enqueue_count"] = 1 + records["dequeue_count"] = 0 + + var acc testutil.Accumulator + + activeMQ := new(ActiveMQ) + + activeMQ.GatherTopicsMetrics(&acc, topics) + acc.AssertContainsTaggedFields(t, "topics_metrics", records, tags) +} + +func TestGatherSubscribersMetrics(t *testing.T) { + + s := ` + + + +` + + subscribers := Subscribers{} + + xml.Unmarshal([]byte(s), &subscribers) + + records := make(map[string]interface{}) + tags := make(map[string]string) + + tags["client_id"] = "AAA" + tags["subscription_name"] = "AAA" + tags["connection_id"] = "NOTSET" + tags["destination_name"] = "AAA" + tags["selector"] = "AA" + tags["active"] = "no" + + records["pending_queue_size"] = 0 + records["dispatched_queue_size"] = 0 + records["dispatched_counter"] = 0 + records["enqueue_counter"] = 0 + records["dequeue_counter"] = 0 + + var acc testutil.Accumulator + + activeMQ := new(ActiveMQ) + + activeMQ.GatherSubscribersMetrics(&acc, subscribers) + acc.AssertContainsTaggedFields(t, "subscribers_metrics", records, tags) +} From 454d7ad2fa855220fb44c0ad027796233d9b8b1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=A1=D0=B5=D1=80=D0=B3=D0=B5=D0=B9=20=D0=92=D0=B0=D1=81?= =?UTF-8?q?=D0=B5=D1=87=D0=BA=D0=BE?= Date: Fri, 30 Mar 2018 17:56:41 +0500 Subject: [PATCH 3/7] fix for review https://github.com/influxdata/telegraf/pull/2689 remove trailing spaces for queue name in gather metrics remove indent --- plugins/inputs/activemq/activemq.go | 41 +++++++++-------------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index f7433329f4396..81c0783593ec2 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" + "strings" ) type ActiveMQ struct { @@ -76,17 +77,17 @@ const ( ) var sampleConfig = ` - ## Required ActiveMQ Endpoint - # server = "192.168.50.10" - ## Required ActiveMQ port - # port = 8161 - ## Required username used for request HTTP Basic Authentication - # username = "admin" - ## Required password used for HTTP Basic Authentication - # password = "admin" - ## Required ActiveMQ webadmin root path - # webadmin = "admin" - ` + ## Required ActiveMQ Endpoint + # server = "192.168.50.10" + ## Required ActiveMQ port + # port = 8161 + ## Required username used for request HTTP Basic Authentication + # username = "admin" + ## Required password used for HTTP Basic Authentication + # password = "admin" + ## Required ActiveMQ webadmin root path + # webadmin = "admin" + ` func (a *ActiveMQ) Description() string { return "Gather ActiveMQ metrics" @@ -98,25 +99,20 @@ func (a *ActiveMQ) SampleConfig() string { func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { client := &http.Client{} - url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword) req, err := http.NewRequest("GET", url, nil) - if err != nil { return nil, err } req.SetBasicAuth(a.Username, a.Password) - resp, err := client.Do(req) - if err != nil { return nil, err } defer resp.Body.Close() - return ioutil.ReadAll(resp.Body) } @@ -125,7 +121,7 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) records := make(map[string]interface{}) tags := make(map[string]string) - tags["name"] = queue.Name + tags["name"] = strings.TrimSpace(queue.Name) records["size"] = queue.Stats.Size records["consumer_count"] = queue.Stats.ConsumerCount @@ -176,39 +172,28 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { dataQueues, err := a.GetMetrics(QUEUES_STATS) - queues := Queues{} - err = xml.Unmarshal(dataQueues, &queues) - if err != nil { return err } dataTopics, err := a.GetMetrics(TOPICS_STATS) - topics := Topics{} - err = xml.Unmarshal(dataTopics, &topics) - if err != nil { return err } dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS) - subscribers := Subscribers{} - err = xml.Unmarshal(dataSubscribers, &subscribers) - if err != nil { return err } a.GatherQueuesMetrics(acc, queues) - a.GatherTopicsMetrics(acc, topics) - a.GatherSubscribersMetrics(acc, subscribers) return nil From 189c4290438c880188a2d501f3bb96e179814a5c Mon Sep 17 00:00:00 2001 From: "mohamed.labouardy@gmail.com" Date: Thu, 9 Aug 2018 16:54:49 +0200 Subject: [PATCH 4/7] add client field to activemq struct --- plugins/inputs/activemq/README.md | 42 +++++++++---- plugins/inputs/activemq/activemq.go | 76 ++++++++++++++++++++---- plugins/inputs/activemq/activemq_test.go | 12 +++- 3 files changed, 102 insertions(+), 28 deletions(-) diff --git a/plugins/inputs/activemq/README.md b/plugins/inputs/activemq/README.md index aa6198ed0739b..ee815375b46b9 100644 --- a/plugins/inputs/activemq/README.md +++ b/plugins/inputs/activemq/README.md @@ -17,18 +17,28 @@ This plugin gather queues, topics & subscribers metrics using ActiveMQ Console A password = "admin" ## Required ActiveMQ webadmin root path webadmin = "admin" + + ## Maximum time to receive response. + # response_timeout = "5s" + + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification + # insecure_skip_verify = false ``` ### Measurements & Fields: Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API. -- queues_metrics: +- activemq_queues: - size - consumer_count - enqueue_count - dequeue_count - - topics_metrics: + - activemq_topics: - size - consumer_count - enqueue_count @@ -42,28 +52,34 @@ Every effort was made to preserve the names based on the XML response from the A ### Tags: -- queues_metrics: +- activemq_queues: - name -- topics_metrics: + - source + - port +- activemq_topics: - name -- subscribers_metrics: + - source + - port +- activemq_subscribers: - client_id - subscription_name - connection_id - destination_name - selector - active + - source + - port ### Example Output: ``` $ ./telegraf -config telegraf.conf -input-filter activemq -test -queues_metrics,name=sandra,host=88284b2fe51b consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 -queues_metrics,name=Test,host=88284b2fe51b dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000 -topics_metrics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000 -topics_metrics,host=88284b2fe51b,name=AAA\ size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000 -topics_metrics,name=ActiveMQ.Advisory.Topic\ ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000 -topics_metrics,name=ActiveMQ.Advisory.Queue\ ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000 -topics_metrics,name=AAAA\ ,host=88284b2fe51b consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 -subscribers_metrics,connection_id=NOTSET,destination_name=AAA,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000 +activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 +activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000 +activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000 +activemq_topics,host=88284b2fe51b,name=AAA\,source=localhost,port=8161 size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000 +activemq_topics,name=ActiveMQ.Advisory.Topic\,source=localhost,port=8161 ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000 +activemq_topics,name=ActiveMQ.Advisory.Queue\,source=localhost,port=8161 ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000 +activemq_topics,name=AAAA\ ,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000 +activemq_subscribers,connection_id=NOTSET,destination_name=AAA,,source=localhost,port=8161,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000 ``` diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index 81c0783593ec2..982ea024097d6 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -5,18 +5,26 @@ import ( "fmt" "io/ioutil" "net/http" + "time" + + "strings" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/plugins/inputs" - "strings" + "github.com/mlabouardy/telegraf/internal" + "github.com/mlabouardy/telegraf/internal/tls" ) type ActiveMQ struct { - Server string `json:"server"` - Port int `json:"port"` - Username string `json:"username"` - Password string `json:"password"` - Webadmin string `json:"webadmin"` + Server string `json:"server"` + Port int `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Webadmin string `json:"webadmin"` + ResponseTimeout internal.Duration + tls.ClientConfig + + client *http.Client } type Topics struct { @@ -87,6 +95,13 @@ var sampleConfig = ` # password = "admin" ## Required ActiveMQ webadmin root path # webadmin = "admin" + ## Maximum time to receive response. + # response_timeout = "5s" + ## Optional TLS Config + # tls_ca = "/etc/telegraf/ca.pem" + # tls_cert = "/etc/telegraf/cert.pem" + # tls_key = "/etc/telegraf/key.pem" + ## Use TLS but skip chain & host verification ` func (a *ActiveMQ) Description() string { @@ -97,8 +112,34 @@ func (a *ActiveMQ) SampleConfig() string { return sampleConfig } +func (a *ActiveMQ) createHttpClient() (*http.Client, error) { + tlsCfg, err := a.ClientConfig.TLSConfig() + if err != nil { + return nil, err + } + + client := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: tlsCfg, + }, + Timeout: a.ResponseTimeout.Duration, + } + + return client, nil +} + func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { - client := &http.Client{} + if a.ResponseTimeout.Duration < time.Second { + a.ResponseTimeout.Duration = time.Second * 5 + } + + if a.client == nil { + client, err := a.createHttpClient() + if err != nil { + return nil, err + } + a.client = client + } url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword) req, err := http.NewRequest("GET", url, nil) @@ -107,7 +148,7 @@ func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) { } req.SetBasicAuth(a.Username, a.Password) - resp, err := client.Do(req) + resp, err := a.client.Do(req) if err != nil { return nil, err } @@ -122,13 +163,15 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) tags := make(map[string]string) tags["name"] = strings.TrimSpace(queue.Name) + tags["source"] = a.Server + tags["port"] = string(a.Port) records["size"] = queue.Stats.Size records["consumer_count"] = queue.Stats.ConsumerCount records["enqueue_count"] = queue.Stats.EnqueueCount records["dequeue_count"] = queue.Stats.DequeueCount - acc.AddFields("queues_metrics", records, tags) + acc.AddFields("activemq_queues", records, tags) } } @@ -138,13 +181,15 @@ func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) tags := make(map[string]string) tags["name"] = topic.Name + tags["source"] = a.Server + tags["port"] = string(a.Port) records["size"] = topic.Stats.Size records["consumer_count"] = topic.Stats.ConsumerCount records["enqueue_count"] = topic.Stats.EnqueueCount records["dequeue_count"] = topic.Stats.DequeueCount - acc.AddFields("topics_metrics", records, tags) + acc.AddFields("activemq_topics", records, tags) } } @@ -159,6 +204,8 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber tags["destination_name"] = subscriber.DestinationName tags["selector"] = subscriber.Selector tags["active"] = subscriber.Active + tags["source"] = a.Server + tags["port"] = string(a.Port) records["pending_queue_size"] = subscriber.Stats.PendingQueueSize records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize @@ -166,7 +213,7 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber records["enqueue_counter"] = subscriber.Stats.EnqueueCounter records["dequeue_counter"] = subscriber.Stats.DequeueCounter - acc.AddFields("subscribers_metrics", records, tags) + acc.AddFields("activemq_subscribers", records, tags) } } @@ -200,5 +247,10 @@ func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error { } func init() { - inputs.Add("activemq", func() telegraf.Input { return &ActiveMQ{} }) + inputs.Add("activemq", func() telegraf.Input { + return &ActiveMQ{ + Server: "localhost", + Port: 8161, + } + }) } diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go index 43552f56776bd..5529d4050de49 100644 --- a/plugins/inputs/activemq/activemq_test.go +++ b/plugins/inputs/activemq/activemq_test.go @@ -34,6 +34,8 @@ func TestGatherQueuesMetrics(t *testing.T) { tags := make(map[string]string) tags["name"] = "Test" + tags["source"] = "localhost" + tags["port"] = "8161" records["size"] = 0 records["consumer_count"] = 0 @@ -45,7 +47,7 @@ func TestGatherQueuesMetrics(t *testing.T) { activeMQ := new(ActiveMQ) activeMQ.GatherQueuesMetrics(&acc, queues) - acc.AssertContainsTaggedFields(t, "queues_metrics", records, tags) + acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags) } func TestGatherTopicsMetrics(t *testing.T) { @@ -76,6 +78,8 @@ func TestGatherTopicsMetrics(t *testing.T) { tags := make(map[string]string) tags["name"] = "ActiveMQ.Advisory.MasterBroker " + tags["source"] = "localhost" + tags["port"] = "8161" records["size"] = 0 records["consumer_count"] = 0 @@ -87,7 +91,7 @@ func TestGatherTopicsMetrics(t *testing.T) { activeMQ := new(ActiveMQ) activeMQ.GatherTopicsMetrics(&acc, topics) - acc.AssertContainsTaggedFields(t, "topics_metrics", records, tags) + acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags) } func TestGatherSubscribersMetrics(t *testing.T) { @@ -111,6 +115,8 @@ func TestGatherSubscribersMetrics(t *testing.T) { tags["destination_name"] = "AAA" tags["selector"] = "AA" tags["active"] = "no" + tags["source"] = "localhost" + tags["port"] = "8161" records["pending_queue_size"] = 0 records["dispatched_queue_size"] = 0 @@ -123,5 +129,5 @@ func TestGatherSubscribersMetrics(t *testing.T) { activeMQ := new(ActiveMQ) activeMQ.GatherSubscribersMetrics(&acc, subscribers) - acc.AssertContainsTaggedFields(t, "subscribers_metrics", records, tags) + acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags) } From 6a0acbb584df85325e6508031b33ec024fddffe7 Mon Sep 17 00:00:00 2001 From: "mohamed.labouardy@gmail.com" Date: Thu, 9 Aug 2018 17:02:09 +0200 Subject: [PATCH 5/7] fix package name --- plugins/inputs/activemq/activemq.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index 982ea024097d6..e64f72f6605c9 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -10,9 +10,9 @@ import ( "strings" "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/internal/tls" "github.com/influxdata/telegraf/plugins/inputs" - "github.com/mlabouardy/telegraf/internal" - "github.com/mlabouardy/telegraf/internal/tls" ) type ActiveMQ struct { From c04543ea243ce65b36fecf01d2d4434f8fa7f916 Mon Sep 17 00:00:00 2001 From: "mohamed.labouardy@gmail.com" Date: Thu, 9 Aug 2018 21:15:57 +0200 Subject: [PATCH 6/7] fix unit test --- plugins/inputs/activemq/activemq.go | 7 ++++--- plugins/inputs/activemq/activemq_test.go | 6 ++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index e64f72f6605c9..c91311d6c12a0 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -5,6 +5,7 @@ import ( "fmt" "io/ioutil" "net/http" + "strconv" "time" "strings" @@ -164,7 +165,7 @@ func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) tags["name"] = strings.TrimSpace(queue.Name) tags["source"] = a.Server - tags["port"] = string(a.Port) + tags["port"] = strconv.Itoa(a.Port) records["size"] = queue.Stats.Size records["consumer_count"] = queue.Stats.ConsumerCount @@ -182,7 +183,7 @@ func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) tags["name"] = topic.Name tags["source"] = a.Server - tags["port"] = string(a.Port) + tags["port"] = strconv.Itoa(a.Port) records["size"] = topic.Stats.Size records["consumer_count"] = topic.Stats.ConsumerCount @@ -205,7 +206,7 @@ func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscriber tags["selector"] = subscriber.Selector tags["active"] = subscriber.Active tags["source"] = a.Server - tags["port"] = string(a.Port) + tags["port"] = strconv.Itoa(a.Port) records["pending_queue_size"] = subscriber.Stats.PendingQueueSize records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go index 5529d4050de49..c277af3c5e72c 100644 --- a/plugins/inputs/activemq/activemq_test.go +++ b/plugins/inputs/activemq/activemq_test.go @@ -45,6 +45,8 @@ func TestGatherQueuesMetrics(t *testing.T) { var acc testutil.Accumulator activeMQ := new(ActiveMQ) + activeMQ.Server = "localhost" + activeMQ.Port = 8161 activeMQ.GatherQueuesMetrics(&acc, queues) acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags) @@ -89,6 +91,8 @@ func TestGatherTopicsMetrics(t *testing.T) { var acc testutil.Accumulator activeMQ := new(ActiveMQ) + activeMQ.Server = "localhost" + activeMQ.Port = 8161 activeMQ.GatherTopicsMetrics(&acc, topics) acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags) @@ -127,6 +131,8 @@ func TestGatherSubscribersMetrics(t *testing.T) { var acc testutil.Accumulator activeMQ := new(ActiveMQ) + activeMQ.Server = "localhost" + activeMQ.Port = 8161 activeMQ.GatherSubscribersMetrics(&acc, subscribers) acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags) From f410178440d0f8f50d627da2c0161b4d6d3fbc49 Mon Sep 17 00:00:00 2001 From: "mohamed.labouardy@gmail.com" Date: Fri, 10 Aug 2018 09:51:48 +0200 Subject: [PATCH 7/7] add line between config options --- plugins/inputs/activemq/README.md | 19 ++++++++++--------- plugins/inputs/activemq/activemq.go | 8 ++++++-- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/plugins/inputs/activemq/README.md b/plugins/inputs/activemq/README.md index ee815375b46b9..b44d12d22f07b 100644 --- a/plugins/inputs/activemq/README.md +++ b/plugins/inputs/activemq/README.md @@ -8,25 +8,26 @@ This plugin gather queues, topics & subscribers metrics using ActiveMQ Console A # Description [[inputs.activemq]] ## Required ActiveMQ Endpoint - server = "192.168.50.10" + # server = "192.168.50.10" + ## Required ActiveMQ port - port = 8161 - ## Required username used for request HTTP Basic Authentication - username = "admin" - ## Required password used for HTTP Basic Authentication - password = "admin" + # port = 8161 + + ## Credentials for basic HTTP authentication + # username = "admin" + # password = "admin" + ## Required ActiveMQ webadmin root path - webadmin = "admin" + # webadmin = "admin" ## Maximum time to receive response. # response_timeout = "5s" - + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem" # tls_key = "/etc/telegraf/key.pem" ## Use TLS but skip chain & host verification - # insecure_skip_verify = false ``` ### Measurements & Fields: diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go index c91311d6c12a0..5b59730d20d90 100644 --- a/plugins/inputs/activemq/activemq.go +++ b/plugins/inputs/activemq/activemq.go @@ -88,16 +88,20 @@ const ( var sampleConfig = ` ## Required ActiveMQ Endpoint # server = "192.168.50.10" + ## Required ActiveMQ port # port = 8161 - ## Required username used for request HTTP Basic Authentication + + ## Credentials for basic HTTP authentication # username = "admin" - ## Required password used for HTTP Basic Authentication # password = "admin" + ## Required ActiveMQ webadmin root path # webadmin = "admin" + ## Maximum time to receive response. # response_timeout = "5s" + ## Optional TLS Config # tls_ca = "/etc/telegraf/ca.pem" # tls_cert = "/etc/telegraf/cert.pem"