diff --git a/README.md b/README.md
index 6307b5356f191..700f0dd2a49d2 100644
--- a/README.md
+++ b/README.md
@@ -127,6 +127,7 @@ configuration options.
## Input Plugins
+* [activemq](./plugins/inputs/activemq)
* [aerospike](./plugins/inputs/aerospike)
* [amqp_consumer](./plugins/inputs/amqp_consumer) (rabbitmq)
* [apache](./plugins/inputs/apache)
diff --git a/plugins/inputs/activemq/README.md b/plugins/inputs/activemq/README.md
new file mode 100644
index 0000000000000..b44d12d22f07b
--- /dev/null
+++ b/plugins/inputs/activemq/README.md
@@ -0,0 +1,86 @@
+# Telegraf Input Plugin: ActiveMQ
+
+This plugin gather queues, topics & subscribers metrics using ActiveMQ Console API.
+
+### Configuration:
+
+```toml
+# Description
+[[inputs.activemq]]
+ ## Required ActiveMQ Endpoint
+ # server = "192.168.50.10"
+
+ ## Required ActiveMQ port
+ # port = 8161
+
+ ## Credentials for basic HTTP authentication
+ # username = "admin"
+ # password = "admin"
+
+ ## Required ActiveMQ webadmin root path
+ # webadmin = "admin"
+
+ ## Maximum time to receive response.
+ # response_timeout = "5s"
+
+ ## Optional TLS Config
+ # tls_ca = "/etc/telegraf/ca.pem"
+ # tls_cert = "/etc/telegraf/cert.pem"
+ # tls_key = "/etc/telegraf/key.pem"
+ ## Use TLS but skip chain & host verification
+```
+
+### Measurements & Fields:
+
+Every effort was made to preserve the names based on the XML response from the ActiveMQ Console API.
+
+- activemq_queues:
+ - size
+ - consumer_count
+ - enqueue_count
+ - dequeue_count
+ - activemq_topics:
+ - size
+ - consumer_count
+ - enqueue_count
+ - dequeue_count
+ - subscribers_metrics:
+ - pending_queue_size
+ - dispatched_queue_size
+ - dispatched_counter
+ - enqueue_counter
+ - dequeue_counter
+
+### Tags:
+
+- activemq_queues:
+ - name
+ - source
+ - port
+- activemq_topics:
+ - name
+ - source
+ - port
+- activemq_subscribers:
+ - client_id
+ - subscription_name
+ - connection_id
+ - destination_name
+ - selector
+ - active
+ - source
+ - port
+
+### Example Output:
+
+```
+$ ./telegraf -config telegraf.conf -input-filter activemq -test
+activemq_queues,name=sandra,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
+activemq_queues,name=Test,host=88284b2fe51b,source=localhost,port=8161 dequeue_count=0i,size=0i,consumer_count=0i,enqueue_count=0i 1492610703000000000
+activemq_topics,name=ActiveMQ.Advisory.MasterBroker\ ,host=88284b2fe51b,source=localhost,port=8161 size=0i,consumer_count=0i,enqueue_count=1i,dequeue_count=0i 1492610703000000000
+activemq_topics,host=88284b2fe51b,name=AAA\,source=localhost,port=8161 size=0i,consumer_count=1i,enqueue_count=0i,dequeue_count=0i 1492610703000000000
+activemq_topics,name=ActiveMQ.Advisory.Topic\,source=localhost,port=8161 ,host=88284b2fe51b enqueue_count=1i,dequeue_count=0i,size=0i,consumer_count=0i 1492610703000000000
+activemq_topics,name=ActiveMQ.Advisory.Queue\,source=localhost,port=8161 ,host=88284b2fe51b size=0i,consumer_count=0i,enqueue_count=2i,dequeue_count=0i 1492610703000000000
+activemq_topics,name=AAAA\ ,host=88284b2fe51b,source=localhost,port=8161 consumer_count=0i,enqueue_count=0i,dequeue_count=0i,size=0i 1492610703000000000
+activemq_subscribers,connection_id=NOTSET,destination_name=AAA,,source=localhost,port=8161,selector=AA,active=no,host=88284b2fe51b,client_id=AAA,subscription_name=AAA pending_queue_size=0i,dispatched_queue_size=0i,dispatched_counter=0i,enqueue_counter=0i,dequeue_counter=0i 1492610703000000000
+```
diff --git a/plugins/inputs/activemq/activemq.go b/plugins/inputs/activemq/activemq.go
new file mode 100644
index 0000000000000..5b59730d20d90
--- /dev/null
+++ b/plugins/inputs/activemq/activemq.go
@@ -0,0 +1,261 @@
+package activemq
+
+import (
+ "encoding/xml"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "strconv"
+ "time"
+
+ "strings"
+
+ "github.com/influxdata/telegraf"
+ "github.com/influxdata/telegraf/internal"
+ "github.com/influxdata/telegraf/internal/tls"
+ "github.com/influxdata/telegraf/plugins/inputs"
+)
+
+type ActiveMQ struct {
+ Server string `json:"server"`
+ Port int `json:"port"`
+ Username string `json:"username"`
+ Password string `json:"password"`
+ Webadmin string `json:"webadmin"`
+ ResponseTimeout internal.Duration
+ tls.ClientConfig
+
+ client *http.Client
+}
+
+type Topics struct {
+ XMLName xml.Name `xml:"topics"`
+ TopicItems []Topic `xml:"topic"`
+}
+
+type Topic struct {
+ XMLName xml.Name `xml:"topic"`
+ Name string `xml:"name,attr"`
+ Stats Stats `xml:"stats"`
+}
+
+type Subscribers struct {
+ XMLName xml.Name `xml:"subscribers"`
+ SubscriberItems []Subscriber `xml:"subscriber"`
+}
+
+type Subscriber struct {
+ XMLName xml.Name `xml:"subscriber"`
+ ClientId string `xml:"clientId,attr"`
+ SubscriptionName string `xml:"subscriptionName,attr"`
+ ConnectionId string `xml:"connectionId,attr"`
+ DestinationName string `xml:"destinationName,attr"`
+ Selector string `xml:"selector,attr"`
+ Active string `xml:"active,attr"`
+ Stats Stats `xml:"stats"`
+}
+
+type Queues struct {
+ XMLName xml.Name `xml:"queues"`
+ QueueItems []Queue `xml:"queue"`
+}
+
+type Queue struct {
+ XMLName xml.Name `xml:"queue"`
+ Name string `xml:"name,attr"`
+ Stats Stats `xml:"stats"`
+}
+
+type Stats struct {
+ XMLName xml.Name `xml:"stats"`
+ Size int `xml:"size,attr"`
+ ConsumerCount int `xml:"consumerCount,attr"`
+ EnqueueCount int `xml:"enqueueCount,attr"`
+ DequeueCount int `xml:"dequeueCount,attr"`
+ PendingQueueSize int `xml:"pendingQueueSize,attr"`
+ DispatchedQueueSize int `xml:"dispatchedQueueSize,attr"`
+ DispatchedCounter int `xml:"dispatchedCounter,attr"`
+ EnqueueCounter int `xml:"enqueueCounter,attr"`
+ DequeueCounter int `xml:"dequeueCounter,attr"`
+}
+
+const (
+ QUEUES_STATS = "queues"
+ TOPICS_STATS = "topics"
+ SUBSCRIBERS_STATS = "subscribers"
+)
+
+var sampleConfig = `
+ ## Required ActiveMQ Endpoint
+ # server = "192.168.50.10"
+
+ ## Required ActiveMQ port
+ # port = 8161
+
+ ## Credentials for basic HTTP authentication
+ # username = "admin"
+ # password = "admin"
+
+ ## Required ActiveMQ webadmin root path
+ # webadmin = "admin"
+
+ ## Maximum time to receive response.
+ # response_timeout = "5s"
+
+ ## Optional TLS Config
+ # tls_ca = "/etc/telegraf/ca.pem"
+ # tls_cert = "/etc/telegraf/cert.pem"
+ # tls_key = "/etc/telegraf/key.pem"
+ ## Use TLS but skip chain & host verification
+ `
+
+func (a *ActiveMQ) Description() string {
+ return "Gather ActiveMQ metrics"
+}
+
+func (a *ActiveMQ) SampleConfig() string {
+ return sampleConfig
+}
+
+func (a *ActiveMQ) createHttpClient() (*http.Client, error) {
+ tlsCfg, err := a.ClientConfig.TLSConfig()
+ if err != nil {
+ return nil, err
+ }
+
+ client := &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: tlsCfg,
+ },
+ Timeout: a.ResponseTimeout.Duration,
+ }
+
+ return client, nil
+}
+
+func (a *ActiveMQ) GetMetrics(keyword string) ([]byte, error) {
+ if a.ResponseTimeout.Duration < time.Second {
+ a.ResponseTimeout.Duration = time.Second * 5
+ }
+
+ if a.client == nil {
+ client, err := a.createHttpClient()
+ if err != nil {
+ return nil, err
+ }
+ a.client = client
+ }
+ url := fmt.Sprintf("http://%s:%d/%s/xml/%s.jsp", a.Server, a.Port, a.Webadmin, keyword)
+
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ req.SetBasicAuth(a.Username, a.Password)
+ resp, err := a.client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+
+ defer resp.Body.Close()
+ return ioutil.ReadAll(resp.Body)
+}
+
+func (a *ActiveMQ) GatherQueuesMetrics(acc telegraf.Accumulator, queues Queues) {
+ for _, queue := range queues.QueueItems {
+ records := make(map[string]interface{})
+ tags := make(map[string]string)
+
+ tags["name"] = strings.TrimSpace(queue.Name)
+ tags["source"] = a.Server
+ tags["port"] = strconv.Itoa(a.Port)
+
+ records["size"] = queue.Stats.Size
+ records["consumer_count"] = queue.Stats.ConsumerCount
+ records["enqueue_count"] = queue.Stats.EnqueueCount
+ records["dequeue_count"] = queue.Stats.DequeueCount
+
+ acc.AddFields("activemq_queues", records, tags)
+ }
+}
+
+func (a *ActiveMQ) GatherTopicsMetrics(acc telegraf.Accumulator, topics Topics) {
+ for _, topic := range topics.TopicItems {
+ records := make(map[string]interface{})
+ tags := make(map[string]string)
+
+ tags["name"] = topic.Name
+ tags["source"] = a.Server
+ tags["port"] = strconv.Itoa(a.Port)
+
+ records["size"] = topic.Stats.Size
+ records["consumer_count"] = topic.Stats.ConsumerCount
+ records["enqueue_count"] = topic.Stats.EnqueueCount
+ records["dequeue_count"] = topic.Stats.DequeueCount
+
+ acc.AddFields("activemq_topics", records, tags)
+ }
+}
+
+func (a *ActiveMQ) GatherSubscribersMetrics(acc telegraf.Accumulator, subscribers Subscribers) {
+ for _, subscriber := range subscribers.SubscriberItems {
+ records := make(map[string]interface{})
+ tags := make(map[string]string)
+
+ tags["client_id"] = subscriber.ClientId
+ tags["subscription_name"] = subscriber.SubscriptionName
+ tags["connection_id"] = subscriber.ConnectionId
+ tags["destination_name"] = subscriber.DestinationName
+ tags["selector"] = subscriber.Selector
+ tags["active"] = subscriber.Active
+ tags["source"] = a.Server
+ tags["port"] = strconv.Itoa(a.Port)
+
+ records["pending_queue_size"] = subscriber.Stats.PendingQueueSize
+ records["dispatched_queue_size"] = subscriber.Stats.DispatchedQueueSize
+ records["dispatched_counter"] = subscriber.Stats.DispatchedCounter
+ records["enqueue_counter"] = subscriber.Stats.EnqueueCounter
+ records["dequeue_counter"] = subscriber.Stats.DequeueCounter
+
+ acc.AddFields("activemq_subscribers", records, tags)
+ }
+}
+
+func (a *ActiveMQ) Gather(acc telegraf.Accumulator) error {
+ dataQueues, err := a.GetMetrics(QUEUES_STATS)
+ queues := Queues{}
+ err = xml.Unmarshal(dataQueues, &queues)
+ if err != nil {
+ return err
+ }
+
+ dataTopics, err := a.GetMetrics(TOPICS_STATS)
+ topics := Topics{}
+ err = xml.Unmarshal(dataTopics, &topics)
+ if err != nil {
+ return err
+ }
+
+ dataSubscribers, err := a.GetMetrics(SUBSCRIBERS_STATS)
+ subscribers := Subscribers{}
+ err = xml.Unmarshal(dataSubscribers, &subscribers)
+ if err != nil {
+ return err
+ }
+
+ a.GatherQueuesMetrics(acc, queues)
+ a.GatherTopicsMetrics(acc, topics)
+ a.GatherSubscribersMetrics(acc, subscribers)
+
+ return nil
+}
+
+func init() {
+ inputs.Add("activemq", func() telegraf.Input {
+ return &ActiveMQ{
+ Server: "localhost",
+ Port: 8161,
+ }
+ })
+}
diff --git a/plugins/inputs/activemq/activemq_test.go b/plugins/inputs/activemq/activemq_test.go
new file mode 100644
index 0000000000000..c277af3c5e72c
--- /dev/null
+++ b/plugins/inputs/activemq/activemq_test.go
@@ -0,0 +1,139 @@
+package activemq
+
+import (
+ "encoding/xml"
+ "testing"
+
+ "github.com/influxdata/telegraf/testutil"
+)
+
+func TestGatherQueuesMetrics(t *testing.T) {
+
+ s := `
+
+
+
+queueBrowse/sandra?view=rss&feedType=atom_1.0
+queueBrowse/sandra?view=rss&feedType=rss_2.0
+
+
+
+
+
+queueBrowse/Test?view=rss&feedType=atom_1.0
+queueBrowse/Test?view=rss&feedType=rss_2.0
+
+
+`
+
+ queues := Queues{}
+
+ xml.Unmarshal([]byte(s), &queues)
+
+ records := make(map[string]interface{})
+ tags := make(map[string]string)
+
+ tags["name"] = "Test"
+ tags["source"] = "localhost"
+ tags["port"] = "8161"
+
+ records["size"] = 0
+ records["consumer_count"] = 0
+ records["enqueue_count"] = 0
+ records["dequeue_count"] = 0
+
+ var acc testutil.Accumulator
+
+ activeMQ := new(ActiveMQ)
+ activeMQ.Server = "localhost"
+ activeMQ.Port = 8161
+
+ activeMQ.GatherQueuesMetrics(&acc, queues)
+ acc.AssertContainsTaggedFields(t, "activemq_queues", records, tags)
+}
+
+func TestGatherTopicsMetrics(t *testing.T) {
+
+ s := `
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+`
+
+ topics := Topics{}
+
+ xml.Unmarshal([]byte(s), &topics)
+
+ records := make(map[string]interface{})
+ tags := make(map[string]string)
+
+ tags["name"] = "ActiveMQ.Advisory.MasterBroker "
+ tags["source"] = "localhost"
+ tags["port"] = "8161"
+
+ records["size"] = 0
+ records["consumer_count"] = 0
+ records["enqueue_count"] = 1
+ records["dequeue_count"] = 0
+
+ var acc testutil.Accumulator
+
+ activeMQ := new(ActiveMQ)
+ activeMQ.Server = "localhost"
+ activeMQ.Port = 8161
+
+ activeMQ.GatherTopicsMetrics(&acc, topics)
+ acc.AssertContainsTaggedFields(t, "activemq_topics", records, tags)
+}
+
+func TestGatherSubscribersMetrics(t *testing.T) {
+
+ s := `
+
+
+
+`
+
+ subscribers := Subscribers{}
+
+ xml.Unmarshal([]byte(s), &subscribers)
+
+ records := make(map[string]interface{})
+ tags := make(map[string]string)
+
+ tags["client_id"] = "AAA"
+ tags["subscription_name"] = "AAA"
+ tags["connection_id"] = "NOTSET"
+ tags["destination_name"] = "AAA"
+ tags["selector"] = "AA"
+ tags["active"] = "no"
+ tags["source"] = "localhost"
+ tags["port"] = "8161"
+
+ records["pending_queue_size"] = 0
+ records["dispatched_queue_size"] = 0
+ records["dispatched_counter"] = 0
+ records["enqueue_counter"] = 0
+ records["dequeue_counter"] = 0
+
+ var acc testutil.Accumulator
+
+ activeMQ := new(ActiveMQ)
+ activeMQ.Server = "localhost"
+ activeMQ.Port = 8161
+
+ activeMQ.GatherSubscribersMetrics(&acc, subscribers)
+ acc.AssertContainsTaggedFields(t, "activemq_subscribers", records, tags)
+}
diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go
index bbca99521817b..ac86fb87985e2 100644
--- a/plugins/inputs/all/all.go
+++ b/plugins/inputs/all/all.go
@@ -1,6 +1,7 @@
package all
import (
+ _ "github.com/influxdata/telegraf/plugins/inputs/activemq"
_ "github.com/influxdata/telegraf/plugins/inputs/aerospike"
_ "github.com/influxdata/telegraf/plugins/inputs/amqp_consumer"
_ "github.com/influxdata/telegraf/plugins/inputs/apache"