Skip to content

Commit

Permalink
Query oplog only when connected to a replica set (influxdata#6307)
Browse files Browse the repository at this point in the history
  • Loading branch information
danielnelson authored Aug 27, 2019
1 parent 701339b commit 2d2e793
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 149 deletions.
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (a *Agent) Test(ctx context.Context, waitDuration time.Duration) error {
// Special instructions for some inputs. cpu, for example, needs to be
// run twice in order to return cpu usage percentages.
switch input.Config.Name {
case "inputs.cpu", "inputs.mongodb", "inputs.procstat":
case "cpu", "mongodb", "procstat":
nulAcc := NewAccumulator(input, nulC)
nulAcc.SetPrecision(a.Precision())
if err := input.Input.Gather(nulAcc); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion plugins/inputs/mongodb/mongodb_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ var DefaultReplStats = map[string]string{
"member_status": "NodeType",
"state": "NodeState",
"repl_lag": "ReplLag",
"repl_oplog_window_sec": "OplogTimeDiff",
}

var DefaultClusterStats = map[string]string{
Expand Down Expand Up @@ -230,6 +229,11 @@ func (d *MongodbData) AddDefaultStats() {
if d.StatLine.NodeType != "" {
d.addStat(statLine, DefaultReplStats)
}

if d.StatLine.OplogStats != nil {
d.add("repl_oplog_window_sec", d.StatLine.OplogStats.TimeDiff)
}

d.addStat(statLine, DefaultClusterStats)
d.addStat(statLine, DefaultShardStats)
if d.StatLine.StorageEngine == "mmapv1" || d.StatLine.StorageEngine == "rocksdb" {
Expand Down
1 change: 0 additions & 1 deletion plugins/inputs/mongodb/mongodb_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ func TestStateTag(t *testing.T) {
"repl_updates": int64(0),
"repl_updates_per_sec": int64(0),
"repl_lag": int64(0),
"repl_oplog_window_sec": int64(0),
"resident_megabytes": int64(0),
"updates": int64(0),
"updates_per_sec": int64(0),
Expand Down
245 changes: 150 additions & 95 deletions plugins/inputs/mongodb/mongodb_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,35 +39,116 @@ func authLogLevel(err error) string {
}
}

func (s *Server) gatherOplogStats() *OplogStats {
stats := &OplogStats{}
localdb := s.Session.DB("local")
func (s *Server) gatherServerStatus() (*ServerStatus, error) {
serverStatus := &ServerStatus{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "serverStatus",
Value: 1,
},
{
Name: "recordStats",
Value: 0,
},
}, serverStatus)
if err != nil {
return nil, err
}
return serverStatus, nil
}

func (s *Server) gatherReplSetStatus() (*ReplSetStatus, error) {
replSetStatus := &ReplSetStatus{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "replSetGetStatus",
Value: 1,
},
}, replSetStatus)
if err != nil {
return nil, err
}
return replSetStatus, nil
}

func (s *Server) gatherClusterStatus() (*ClusterStatus, error) {
chunkCount, err := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count()
if err != nil {
return nil, err
}

return &ClusterStatus{
JumboChunksCount: int64(chunkCount),
}, nil
}

func (s *Server) gatherShardConnPoolStats() (*ShardStats, error) {
shardStats := &ShardStats{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "shardConnPoolStats",
Value: 1,
},
}, &shardStats)
if err != nil {
return nil, err
}
return shardStats, nil
}

op_first := oplogEntry{}
op_last := oplogEntry{}
func (s *Server) gatherDBStats(name string) (*Db, error) {
stats := &DbStatsData{}
err := s.Session.DB(name).Run(bson.D{
{
Name: "dbStats",
Value: 1,
},
}, stats)
if err != nil {
return nil, err
}

return &Db{
Name: name,
DbStatsData: stats,
}, nil
}

func (s *Server) getOplogReplLag(collection string) (*OplogStats, error) {
query := bson.M{"ts": bson.M{"$exists": true}}

for _, collection_name := range []string{"oplog.rs", "oplog.$main"} {
if err := localdb.C(collection_name).Find(query).Sort("$natural").Limit(1).One(&op_first); err != nil {
if err == mgo.ErrNotFound {
continue
}
log.Printf("%s [inputs.mongodb] Error getting first oplog entry: %v", authLogLevel(err), err)
return stats
}
if err := localdb.C(collection_name).Find(query).Sort("-$natural").Limit(1).One(&op_last); err != nil {
if err == mgo.ErrNotFound || IsAuthorization(err) {
continue
}
log.Printf("%s [inputs.mongodb] Error getting first oplog entry: %v", authLogLevel(err), err)
return stats
}
var first oplogEntry
err := s.Session.DB("local").C(collection).Find(query).Sort("$natural").Limit(1).One(&first)
if err != nil {
return nil, err
}

var last oplogEntry
err = s.Session.DB("local").C(collection).Find(query).Sort("-$natural").Limit(1).One(&last)
if err != nil {
return nil, err
}

firstTime := time.Unix(int64(first.Timestamp>>32), 0)
lastTime := time.Unix(int64(last.Timestamp>>32), 0)
stats := &OplogStats{
TimeDiff: int64(lastTime.Sub(firstTime).Seconds()),
}
return stats, nil
}

// The "oplog.rs" collection is stored on all replica set members.
//
// The "oplog.$main" collection is created on the master node of a
// master-slave replicated deployment. As of MongoDB 3.2, master-slave
// replication has been deprecated.
func (s *Server) gatherOplogStats() (*OplogStats, error) {
stats, err := s.getOplogReplLag("oplog.rs")
if err == nil {
return stats, nil
}

op_first_time := time.Unix(int64(op_first.Timestamp>>32), 0)
op_last_time := time.Unix(int64(op_last.Timestamp>>32), 0)
stats.TimeDiff = int64(op_last_time.Sub(op_first_time).Seconds())
return stats
return s.getOplogReplLag("oplog.$main")
}

func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error) {
Expand Down Expand Up @@ -112,99 +193,71 @@ func (s *Server) gatherCollectionStats(colStatsDbs []string) (*ColStats, error)
func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gatherColStats bool, colStatsDbs []string) error {
s.Session.SetMode(mgo.Eventual, true)
s.Session.SetSocketTimeout(0)
result_server := &ServerStatus{}
err := s.Session.DB("admin").Run(bson.D{
{
Name: "serverStatus",
Value: 1,
},
{
Name: "recordStats",
Value: 0,
},
}, result_server)

serverStatus, err := s.gatherServerStatus()
if err != nil {
return err
}
result_repl := &ReplSetStatus{}
// ignore error because it simply indicates that the db is not a member
// in a replica set, which is fine.
_ = s.Session.DB("admin").Run(bson.D{
{
Name: "replSetGetStatus",
Value: 1,
},
}, result_repl)

jumbo_chunks, _ := s.Session.DB("config").C("chunks").Find(bson.M{"jumbo": true}).Count()
// Get replica set status, an error indicates that the server is not a
// member of a replica set.
replSetStatus, err := s.gatherReplSetStatus()
if err != nil {
log.Printf("D! [inputs.mongodb] Unable to gather replica set status: %v", err)
}

result_cluster := &ClusterStatus{
JumboChunksCount: int64(jumbo_chunks),
// Gather the oplog if we are a member of a replica set. Non-replica set
// members do not have the oplog collections.
var oplogStats *OplogStats
if replSetStatus != nil {
oplogStats, err = s.gatherOplogStats()
if err != nil {
return err
}
}

resultShards := &ShardStats{}
err = s.Session.DB("admin").Run(bson.D{
{
Name: "shardConnPoolStats",
Value: 1,
},
}, &resultShards)
clusterStatus, err := s.gatherClusterStatus()
if err != nil {
if IsAuthorization(err) {
log.Printf("D! [inputs.mongodb] Error getting database shard stats: %v", err)
} else {
log.Printf("E! [inputs.mongodb] Error getting database shard stats: %v", err)
}
log.Printf("D! [inputs.mongodb] Unable to gather cluster status: %v", err)
}

oplogStats := s.gatherOplogStats()
shardStats, err := s.gatherShardConnPoolStats()
if err != nil {
log.Printf("%s [inputs.mongodb] Unable to gather shard connection pool stats: %v",
authLogLevel(err), err)
}

result_db_stats := &DbStats{}
if gatherDbStats == true {
names := []string{}
names, err = s.Session.DatabaseNames()
collectionStats, err := s.gatherCollectionStats(colStatsDbs)
if err != nil {
return err
}

dbStats := &DbStats{}
if gatherDbStats {
names, err := s.Session.DatabaseNames()
if err != nil {
log.Printf("E! [inputs.mongodb] Error getting database names: %v", err)
return err
}
for _, db_name := range names {
db_stat_line := &DbStatsData{}
err = s.Session.DB(db_name).Run(bson.D{
{
Name: "dbStats",
Value: 1,
},
}, db_stat_line)

for _, name := range names {
db, err := s.gatherDBStats(name)
if err != nil {
log.Printf("E! [inputs.mongodb] Error getting db stats from %q: %v", db_name, err)
}
db := &Db{
Name: db_name,
DbStatsData: db_stat_line,
log.Printf("D! [inputs.mongodb] Error getting db stats from %q: %v", name, err)
}

result_db_stats.Dbs = append(result_db_stats.Dbs, *db)
dbStats.Dbs = append(dbStats.Dbs, *db)
}
}

result_col_stats, err := s.gatherCollectionStats(colStatsDbs)
if err != nil {
return err
}

result := &MongoStatus{
ServerStatus: result_server,
ReplSetStatus: result_repl,
ClusterStatus: result_cluster,
DbStats: result_db_stats,
ColStats: result_col_stats,
ShardStats: resultShards,
ServerStatus: serverStatus,
ReplSetStatus: replSetStatus,
ClusterStatus: clusterStatus,
DbStats: dbStats,
ColStats: collectionStats,
ShardStats: shardStats,
OplogStats: oplogStats,
}

defer func() {
s.lastResult = result
}()

result.SampleTime = time.Now()
if s.lastResult != nil && result != nil {
duration := result.SampleTime.Sub(s.lastResult.SampleTime)
Expand All @@ -222,6 +275,8 @@ func (s *Server) gatherData(acc telegraf.Accumulator, gatherDbStats bool, gather
data.AddShardHostStats()
data.flush(acc)
}

s.lastResult = result
return nil
}

Expand Down
Loading

0 comments on commit 2d2e793

Please sign in to comment.