Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add input plugin for McRouter #4077

Merged
merged 1 commit into from
May 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/logparser"
_ "github.com/influxdata/telegraf/plugins/inputs/lustre2"
_ "github.com/influxdata/telegraf/plugins/inputs/mailchimp"
_ "github.com/influxdata/telegraf/plugins/inputs/mcrouter"
_ "github.com/influxdata/telegraf/plugins/inputs/memcached"
_ "github.com/influxdata/telegraf/plugins/inputs/mesos"
_ "github.com/influxdata/telegraf/plugins/inputs/minecraft"
Expand Down
103 changes: 103 additions & 0 deletions plugins/inputs/mcrouter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Mcrouter Input Plugin

This plugin gathers statistics data from a Mcrouter server.

### Configuration:

```toml
# Read metrics from one or many mcrouter servers.
[[inputs.mcrouter]]
## An array of address to gather stats about. Specify an ip or hostname
## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc.
servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"]

## Timeout for metric collections from all servers. Minimum timeout is "1s".
# timeout = "5s"
```

### Measurements & Fields:

The fields from this plugin are gathered in the *mcrouter* measurement.

Description of gathered fields can be found [here](https://github.com/facebook/mcrouter/wiki/Stats-list).

Fields:

* uptime
* num_servers
* num_servers_new
* num_servers_up
* num_servers_down
* num_servers_closed
* num_clients
* num_suspect_servers
* destination_batches_sum
* destination_requests_sum
* outstanding_route_get_reqs_queued
* outstanding_route_update_reqs_queued
* outstanding_route_get_avg_queue_size
* outstanding_route_update_avg_queue_size
* outstanding_route_get_avg_wait_time_sec
* outstanding_route_update_avg_wait_time_sec
* retrans_closed_connections
* destination_pending_reqs
* destination_inflight_reqs
* destination_batch_size
* asynclog_requests
* proxy_reqs_processing
* proxy_reqs_waiting
* client_queue_notify_period
* rusage_system
* rusage_user
* ps_num_minor_faults
* ps_num_major_faults
* ps_user_time_sec
* ps_system_time_sec
* ps_vsize
* ps_rss
* fibers_allocated
* fibers_pool_size
* fibers_stack_high_watermark
* successful_client_connections
* duration_us
* destination_max_pending_reqs
* destination_max_inflight_reqs
* retrans_per_kbyte_max
* cmd_get_count
* cmd_delete_out
* cmd_lease_get
* cmd_set
* cmd_get_out_all
* cmd_get_out
* cmd_lease_set_count
* cmd_other_out_all
* cmd_lease_get_out
* cmd_set_count
* cmd_lease_set_out
* cmd_delete_count
* cmd_other
* cmd_delete
* cmd_get
* cmd_lease_set
* cmd_set_out
* cmd_lease_get_count
* cmd_other_out
* cmd_lease_get_out_all
* cmd_set_out_all
* cmd_other_count
* cmd_delete_out_all
* cmd_lease_set_out_all

### Tags:

* Mcrouter measurements have the following tags:
- server (the host name from which metrics are gathered)



### Example Output:

```
$ ./telegraf --config telegraf.conf --input-filter mcrouter --test
mcrouter,server=localhost:11211 uptime=166,num_servers=1,num_servers_new=1,num_servers_up=0,num_servers_down=0,num_servers_closed=0,num_clients=1,num_suspect_servers=0,destination_batches_sum=0,destination_requests_sum=0,outstanding_route_get_reqs_queued=0,outstanding_route_update_reqs_queued=0,outstanding_route_get_avg_queue_size=0,outstanding_route_update_avg_queue_size=0,outstanding_route_get_avg_wait_time_sec=0,outstanding_route_update_avg_wait_time_sec=0,retrans_closed_connections=0,destination_pending_reqs=0,destination_inflight_reqs=0,destination_batch_size=0,asynclog_requests=0,proxy_reqs_processing=1,proxy_reqs_waiting=0,client_queue_notify_period=0,rusage_system=0.040966,rusage_user=0.020483,ps_num_minor_faults=2490,ps_num_major_faults=11,ps_user_time_sec=0.02,ps_system_time_sec=0.04,ps_vsize=697741312,ps_rss=10563584,fibers_allocated=0,fibers_pool_size=0,fibers_stack_high_watermark=0,successful_client_connections=18,duration_us=0,destination_max_pending_reqs=0,destination_max_inflight_reqs=0,retrans_per_kbyte_max=0,cmd_get_count=0,cmd_delete_out=0,cmd_lease_get=0,cmd_set=0,cmd_get_out_all=0,cmd_get_out=0,cmd_lease_set_count=0,cmd_other_out_all=0,cmd_lease_get_out=0,cmd_set_count=0,cmd_lease_set_out=0,cmd_delete_count=0,cmd_other=0,cmd_delete=0,cmd_get=0,cmd_lease_set=0,cmd_set_out=0,cmd_lease_get_count=0,cmd_other_out=0,cmd_lease_get_out_all=0,cmd_set_out_all=0,cmd_other_count=0,cmd_delete_out_all=0,cmd_lease_set_out_all=0 1453831884664956455
```
286 changes: 286 additions & 0 deletions plugins/inputs/mcrouter/mcrouter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
package mcrouter

import (
"bufio"
"context"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

// Mcrouter is a mcrouter plugin
type Mcrouter struct {
Servers []string
Timeout internal.Duration
}

// enum for statType
type statType int

const (
typeInt statType = iota
typeFloat statType = iota
)

var sampleConfig = `
## An array of address to gather stats about. Specify an ip or hostname
## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc.
servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"]

## Timeout for metric collections from all servers. Minimum timeout is "1s".
# timeout = "5s"
`

var defaultTimeout = 5 * time.Second

var defaultServerURL = url.URL{
Scheme: "tcp",
Host: "localhost:11211",
}

// The list of metrics that should be sent
var sendMetrics = map[string]statType{
"uptime": typeInt,
"num_servers": typeInt,
"num_servers_new": typeInt,
"num_servers_up": typeInt,
"num_servers_down": typeInt,
"num_servers_closed": typeInt,
"num_clients": typeInt,
"num_suspect_servers": typeInt,
"destination_batches_sum": typeInt,
"destination_requests_sum": typeInt,
"outstanding_route_get_reqs_queued": typeInt,
"outstanding_route_update_reqs_queued": typeInt,
"outstanding_route_get_avg_queue_size": typeInt,
"outstanding_route_update_avg_queue_size": typeInt,
"outstanding_route_get_avg_wait_time_sec": typeInt,
"outstanding_route_update_avg_wait_time_sec": typeInt,
"retrans_closed_connections": typeInt,
"destination_pending_reqs": typeInt,
"destination_inflight_reqs": typeInt,
"destination_batch_size": typeInt,
"asynclog_requests": typeInt,
"proxy_reqs_processing": typeInt,
"proxy_reqs_waiting": typeInt,
"client_queue_notify_period": typeInt,
"rusage_system": typeFloat,
"rusage_user": typeFloat,
"ps_num_minor_faults": typeInt,
"ps_num_major_faults": typeInt,
"ps_user_time_sec": typeFloat,
"ps_system_time_sec": typeFloat,
"ps_vsize": typeInt,
"ps_rss": typeInt,
"fibers_allocated": typeInt,
"fibers_pool_size": typeInt,
"fibers_stack_high_watermark": typeInt,
"successful_client_connections": typeInt,
"duration_us": typeInt,
"destination_max_pending_reqs": typeInt,
"destination_max_inflight_reqs": typeInt,
"retrans_per_kbyte_max": typeInt,
"cmd_get_count": typeInt,
"cmd_delete_out": typeInt,
"cmd_lease_get": typeInt,
"cmd_set": typeInt,
"cmd_get_out_all": typeInt,
"cmd_get_out": typeInt,
"cmd_lease_set_count": typeInt,
"cmd_other_out_all": typeInt,
"cmd_lease_get_out": typeInt,
"cmd_set_count": typeInt,
"cmd_lease_set_out": typeInt,
"cmd_delete_count": typeInt,
"cmd_other": typeInt,
"cmd_delete": typeInt,
"cmd_get": typeInt,
"cmd_lease_set": typeInt,
"cmd_set_out": typeInt,
"cmd_lease_get_count": typeInt,
"cmd_other_out": typeInt,
"cmd_lease_get_out_all": typeInt,
"cmd_set_out_all": typeInt,
"cmd_other_count": typeInt,
"cmd_delete_out_all": typeInt,
"cmd_lease_set_out_all": typeInt,
}

// SampleConfig returns sample configuration message
func (m *Mcrouter) SampleConfig() string {
return sampleConfig
}

// Description returns description of Mcrouter plugin
func (m *Mcrouter) Description() string {
return "Read metrics from one or many mcrouter servers"
}

// Gather reads stats from all configured servers accumulates stats
func (m *Mcrouter) Gather(acc telegraf.Accumulator) error {
ctx := context.Background()

if m.Timeout.Duration < 1*time.Second {
m.Timeout.Duration = defaultTimeout
}

ctx, cancel := context.WithTimeout(ctx, m.Timeout.Duration)
defer cancel()

if len(m.Servers) == 0 {
m.Servers = []string{defaultServerURL.String()}
}

for _, serverAddress := range m.Servers {
acc.AddError(m.gatherServer(ctx, serverAddress, acc))
}

return nil
}

// ParseAddress parses an address string into 'host:port' and 'protocol' parts
func (m *Mcrouter) ParseAddress(address string) (string, string, error) {
var protocol string
var host string
var port string

u, parseError := url.Parse(address)

if parseError != nil {
return "", "", fmt.Errorf("Invalid server address")
}

if u.Scheme != "tcp" && u.Scheme != "unix" {
return "", "", fmt.Errorf("Invalid server protocol")
}

protocol = u.Scheme

if protocol == "unix" {
if u.Path == "" {
return "", "", fmt.Errorf("Invalid unix socket path")
}

address = u.Path
} else {
if u.Host == "" {
return "", "", fmt.Errorf("Invalid host")
}

host = u.Hostname()
port = u.Port()

if host == "" {
host = defaultServerURL.Hostname()
}

if port == "" {
port = defaultServerURL.Port()
}

address = host + ":" + port
}

return address, protocol, nil
}

func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegraf.Accumulator) error {
var conn net.Conn
var err error
var protocol string
var dialer net.Dialer

address, protocol, err = m.ParseAddress(address)

conn, err = dialer.DialContext(ctx, protocol, address)

if err != nil {
return err
}

defer conn.Close()

// Extend connection
deadline, ok := ctx.Deadline()

if ok {
conn.SetDeadline(deadline)
}

// Read and write buffer
reader := bufio.NewReader(conn)
scanner := bufio.NewScanner(reader)

// Send command
if _, err := fmt.Fprint(conn, "stats\r\n"); err != nil {
return err
}

values, err := parseResponse(scanner)

if err != nil {
return err
}

// Add server address as a tag
tags := map[string]string{"server": address}

// Process values
fields := make(map[string]interface{})
for key, sType := range sendMetrics {
if value, ok := values[key]; ok {
switch sType {
case typeInt:
if v, errParse := strconv.ParseInt(value, 10, 64); errParse == nil {
fields[key] = v
}
case typeFloat:
if v, errParse := strconv.ParseFloat(value, 64); errParse == nil {
fields[key] = v
}
default:
}
}
}
acc.AddFields("mcrouter", fields, tags)
return nil
}

func parseResponse(r *bufio.Scanner) (map[string]string, error) {
values := make(map[string]string)

for r.Scan() {
// Read line
line := r.Text()

// Done
if line == "END" {
break
}

// Read values
s := strings.SplitN(line, " ", 3)

if len(s) != 3 || s[0] != "STAT" {
return nil, fmt.Errorf("unexpected line in stats response: %s", line)
}

// Save values
values[s[1]] = s[2]
}

return values, nil
}

func init() {
inputs.Add("mcrouter", func() telegraf.Input {
return &Mcrouter{}
})
}
Loading