Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add MongoDB plugin #54

Merged
merged 3 commits into from
Jul 21, 2015
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
add MongoDB plugin
  • Loading branch information
jipperinbham committed Jul 7, 2015

Verified

This commit was signed with the committer’s verified signature. The key has expired.
juliusknorr Julius Knorr
commit 86145d5eb56c451df5d7206e5daca2a5645529f9
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -49,6 +49,8 @@ Telegraf currently has support for collecting metrics from:
* MySQL
* PostgreSQL
* Redis
* RethinkDB
* MongoDB

We'll be adding support for many more over the coming months. Read on if you want to add support for another service or third-party API.

12 changes: 10 additions & 2 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
@@ -99,11 +99,11 @@ servers = ["localhost"]
# postgres://[pqgotest[:password]]@localhost?sslmode=[disable|verify-ca|verify-full]
# or a simple string:
# host=localhost user=pqotest password=... sslmode=...
#
#
# All connection parameters are optional. By default, the host is localhost
# and the user is the currently running user. For localhost, we default
# to sslmode=disable as well.
#
#

address = "sslmode=disable"

@@ -124,6 +124,14 @@ address = "sslmode=disable"
# If no servers are specified, then localhost is used as the host.
servers = ["localhost"]

[mongodb]
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:[email protected]:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]

# Read metrics about swap memory usage
[swap]
# no configuration
1 change: 1 addition & 0 deletions plugins/all/all.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package all
import (
_ "github.com/influxdb/telegraf/plugins/kafka_consumer"
_ "github.com/influxdb/telegraf/plugins/memcached"
_ "github.com/influxdb/telegraf/plugins/mongodb"
_ "github.com/influxdb/telegraf/plugins/mysql"
_ "github.com/influxdb/telegraf/plugins/postgresql"
_ "github.com/influxdb/telegraf/plugins/redis"
110 changes: 110 additions & 0 deletions plugins/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package mongodb

import (
"fmt"
"net/url"
"sync"
"time"

"github.com/influxdb/telegraf/plugins"
"gopkg.in/mgo.v2"
)

type MongoDB struct {
Servers []string
mongos map[string]*Server
}

var sampleConfig = `
# An array of URI to gather stats about. Specify an ip or hostname
# with optional port add password. ie mongodb://user:[email protected]:27017,
# mongodb://10.10.3.33:18832, 10.0.0.1:10000, etc.
#
# If no servers are specified, then 127.0.0.1 is used as the host and 27107 as the port.
servers = ["127.0.0.1:27017"]`

func (m *MongoDB) SampleConfig() string {
return sampleConfig
}

func (*MongoDB) Description() string {
return "Read metrics from one or many MongoDB servers"
}

var localhost = &url.URL{Host: "127.0.0.1:27017"}

// Reads stats from all configured servers accumulates stats.
// Returns one of the errors encountered while gather stats (if any).
func (m *MongoDB) Gather(acc plugins.Accumulator) error {
if len(m.Servers) == 0 {
m.gatherServer(m.getMongoServer(localhost), acc)
return nil
}

var wg sync.WaitGroup

var outerr error

for _, serv := range m.Servers {
u, err := url.Parse(serv)
if err != nil {
return fmt.Errorf("Unable to parse to address '%s': %s", serv, err)
} else if u.Scheme == "" {
u.Scheme = "mongodb"
// fallback to simple string based address (i.e. "10.0.0.1:10000")
u.Host = serv
if u.Path == u.Host {
u.Path = ""
}
}
wg.Add(1)
go func() {
defer wg.Done()
outerr = m.gatherServer(m.getMongoServer(u), acc)
}()
}

wg.Wait()

return outerr
}

func (m *MongoDB) getMongoServer(url *url.URL) *Server {
if _, ok := m.mongos[url.Host]; !ok {
m.mongos[url.Host] = &Server{
Url: url,
}
}
return m.mongos[url.Host]
}

func (m *MongoDB) gatherServer(server *Server, acc plugins.Accumulator) error {
if server.Session == nil {
var dialAddrs []string
if server.Url.User != nil {
dialAddrs = []string{server.Url.String()}
} else {
dialAddrs = []string{server.Url.Host}
}
dialInfo, err := mgo.ParseURL(dialAddrs[0])
if err != nil {
return fmt.Errorf("Unable to parse URL (%s), %s\n", dialAddrs[0], err.Error())
}
dialInfo.Direct = true
dialInfo.Timeout = time.Duration(10) * time.Second
sess, err := mgo.DialWithInfo(dialInfo)
if err != nil {
return fmt.Errorf("Unable to connect to MongoDB, %s\n", err.Error())
}
server.Session = sess
}
return server.gatherData(acc)
}

func init() {
plugins.Add("mongodb", func() plugins.Plugin {
return &MongoDB{
mongos: make(map[string]*Server),
}
})
}
100 changes: 100 additions & 0 deletions plugins/mongodb/mongodb_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package mongodb

import (
"fmt"
"reflect"
"strconv"

"github.com/influxdb/telegraf/plugins"
)

type MongodbData struct {
StatLine *StatLine
Tags map[string]string
}

func NewMongodbData(statLine *StatLine, tags map[string]string) *MongodbData {
if statLine.NodeType != "" && statLine.NodeType != "UNK" {
tags["state"] = statLine.NodeType
}
return &MongodbData{
StatLine: statLine,
Tags: tags,
}
}

var DefaultStats = map[string]string{
"inserts_per_sec": "Insert",
"queries_per_sec": "Query",
"updates_per_sec": "Update",
"deletes_per_sec": "Delete",
"getmores_per_sec": "GetMore",
"commands_per_sec": "Command",
"flushes_per_sec": "Flushes",
"vsize_megabytes": "Virtual",
"resident_megabytes": "Resident",
"queued_reads": "QueuedReaders",
"queued_writes": "QueuedWriters",
"active_reads": "ActiveReaders",
"active_writes": "ActiveWriters",
"net_in_bytes": "NetIn",
"net_out_bytes": "NetOut",
"open_connections": "NumConnections",
}

var DefaultReplStats = map[string]string{
"repl_inserts_per_sec": "InsertR",
"repl_queries_per_sec": "QueryR",
"repl_updates_per_sec": "UpdateR",
"repl_deletes_per_sec": "DeleteR",
"repl_getmores_per_sec": "GetMoreR",
"repl_commands_per_sec": "CommandR",
"member_status": "NodeType",
}

var MmapStats = map[string]string{
"mapped_megabytes": "Mapped",
"non-mapped_megabytes": "NonMapped",
"page_faults_per_sec": "Faults",
}

var WiredTigerStats = map[string]string{
"percent_cache_dirty": "CacheDirtyPercent",
"percent_cache_used": "CacheUsedPercent",
}

func (d *MongodbData) AddDefaultStats(acc plugins.Accumulator) {
statLine := reflect.ValueOf(d.StatLine).Elem()
d.addStat(acc, statLine, DefaultStats)
if d.StatLine.NodeType != "" {
d.addStat(acc, statLine, DefaultReplStats)
}
if d.StatLine.StorageEngine == "mmapv1" {
d.addStat(acc, statLine, MmapStats)
} else if d.StatLine.StorageEngine == "wiredTiger" {
for key, value := range WiredTigerStats {
val := statLine.FieldByName(value).Interface()
percentVal := fmt.Sprintf("%.1f", val.(float64)*100)
floatVal, _ := strconv.ParseFloat(percentVal, 64)
d.add(acc, key, floatVal)
}
}
}

func (d *MongodbData) addStat(acc plugins.Accumulator, statLine reflect.Value, stats map[string]string) {
for key, value := range stats {
val := statLine.FieldByName(value).Interface()
d.add(acc, key, val)
}
}

func (d *MongodbData) add(acc plugins.Accumulator, key string, val interface{}) {
acc.AddValuesWithTime(
key,
map[string]interface{}{
"value": val,
},
d.Tags,
d.StatLine.Time,
)
}
111 changes: 111 additions & 0 deletions plugins/mongodb/mongodb_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package mongodb

import (
"testing"
"time"

"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var tags = make(map[string]string)

func TestAddNonReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
Insert: 0,
Query: 0,
Update: 0,
Delete: 0,
GetMore: 0,
Command: 0,
Flushes: 0,
Virtual: 0,
Resident: 0,
QueuedReaders: 0,
QueuedWriters: 0,
ActiveReaders: 0,
ActiveWriters: 0,
NetIn: 0,
NetOut: 0,
NumConnections: 0,
},
tags,
)
var acc testutil.Accumulator

d.AddDefaultStats(&acc)

for key, _ := range DefaultStats {
assert.True(t, acc.HasIntValue(key))
}
}

func TestAddReplStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "mmapv1",
Mapped: 0,
NonMapped: 0,
Faults: 0,
},
tags,
)

var acc testutil.Accumulator

d.AddDefaultStats(&acc)

for key, _ := range MmapStats {
assert.True(t, acc.HasIntValue(key))
}
}

func TestAddWiredTigerStats(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "wiredTiger",
CacheDirtyPercent: 0,
CacheUsedPercent: 0,
},
tags,
)

var acc testutil.Accumulator

d.AddDefaultStats(&acc)

for key, _ := range WiredTigerStats {
assert.True(t, acc.HasFloatValue(key))
}
}

func TestStateTag(t *testing.T) {
d := NewMongodbData(
&StatLine{
StorageEngine: "",
Time: time.Now(),
Insert: 0,
Query: 0,
NodeType: "PRI",
},
tags,
)

stats := []string{"inserts_per_sec", "queries_per_sec"}

stateTags := make(map[string]string)
stateTags["state"] = "PRI"

var acc testutil.Accumulator

d.AddDefaultStats(&acc)

for _, key := range stats {
err := acc.ValidateTaggedValue(key, int64(0), stateTags)
require.NoError(t, err)
}
}
Loading