-
Notifications
You must be signed in to change notification settings - Fork 5.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add input plugin for McRouter (#4077)
- Loading branch information
1 parent
cb0472c
commit 9803d62
Showing
4 changed files
with
640 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
# Mcrouter Input Plugin | ||
|
||
This plugin gathers statistics data from a Mcrouter server. | ||
|
||
### Configuration: | ||
|
||
```toml | ||
# Read metrics from one or many mcrouter servers. | ||
[[inputs.mcrouter]] | ||
## An array of address to gather stats about. Specify an ip or hostname | ||
## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc. | ||
servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"] | ||
|
||
## Timeout for metric collections from all servers. Minimum timeout is "1s". | ||
# timeout = "5s" | ||
``` | ||
|
||
### Measurements & Fields: | ||
|
||
The fields from this plugin are gathered in the *mcrouter* measurement. | ||
|
||
Description of gathered fields can be found [here](https://github.com/facebook/mcrouter/wiki/Stats-list). | ||
|
||
Fields: | ||
|
||
* uptime | ||
* num_servers | ||
* num_servers_new | ||
* num_servers_up | ||
* num_servers_down | ||
* num_servers_closed | ||
* num_clients | ||
* num_suspect_servers | ||
* destination_batches_sum | ||
* destination_requests_sum | ||
* outstanding_route_get_reqs_queued | ||
* outstanding_route_update_reqs_queued | ||
* outstanding_route_get_avg_queue_size | ||
* outstanding_route_update_avg_queue_size | ||
* outstanding_route_get_avg_wait_time_sec | ||
* outstanding_route_update_avg_wait_time_sec | ||
* retrans_closed_connections | ||
* destination_pending_reqs | ||
* destination_inflight_reqs | ||
* destination_batch_size | ||
* asynclog_requests | ||
* proxy_reqs_processing | ||
* proxy_reqs_waiting | ||
* client_queue_notify_period | ||
* rusage_system | ||
* rusage_user | ||
* ps_num_minor_faults | ||
* ps_num_major_faults | ||
* ps_user_time_sec | ||
* ps_system_time_sec | ||
* ps_vsize | ||
* ps_rss | ||
* fibers_allocated | ||
* fibers_pool_size | ||
* fibers_stack_high_watermark | ||
* successful_client_connections | ||
* duration_us | ||
* destination_max_pending_reqs | ||
* destination_max_inflight_reqs | ||
* retrans_per_kbyte_max | ||
* cmd_get_count | ||
* cmd_delete_out | ||
* cmd_lease_get | ||
* cmd_set | ||
* cmd_get_out_all | ||
* cmd_get_out | ||
* cmd_lease_set_count | ||
* cmd_other_out_all | ||
* cmd_lease_get_out | ||
* cmd_set_count | ||
* cmd_lease_set_out | ||
* cmd_delete_count | ||
* cmd_other | ||
* cmd_delete | ||
* cmd_get | ||
* cmd_lease_set | ||
* cmd_set_out | ||
* cmd_lease_get_count | ||
* cmd_other_out | ||
* cmd_lease_get_out_all | ||
* cmd_set_out_all | ||
* cmd_other_count | ||
* cmd_delete_out_all | ||
* cmd_lease_set_out_all | ||
|
||
### Tags: | ||
|
||
* Mcrouter measurements have the following tags: | ||
- server (the host name from which metrics are gathered) | ||
|
||
|
||
|
||
### Example Output: | ||
|
||
``` | ||
$ ./telegraf --config telegraf.conf --input-filter mcrouter --test | ||
mcrouter,server=localhost:11211 uptime=166,num_servers=1,num_servers_new=1,num_servers_up=0,num_servers_down=0,num_servers_closed=0,num_clients=1,num_suspect_servers=0,destination_batches_sum=0,destination_requests_sum=0,outstanding_route_get_reqs_queued=0,outstanding_route_update_reqs_queued=0,outstanding_route_get_avg_queue_size=0,outstanding_route_update_avg_queue_size=0,outstanding_route_get_avg_wait_time_sec=0,outstanding_route_update_avg_wait_time_sec=0,retrans_closed_connections=0,destination_pending_reqs=0,destination_inflight_reqs=0,destination_batch_size=0,asynclog_requests=0,proxy_reqs_processing=1,proxy_reqs_waiting=0,client_queue_notify_period=0,rusage_system=0.040966,rusage_user=0.020483,ps_num_minor_faults=2490,ps_num_major_faults=11,ps_user_time_sec=0.02,ps_system_time_sec=0.04,ps_vsize=697741312,ps_rss=10563584,fibers_allocated=0,fibers_pool_size=0,fibers_stack_high_watermark=0,successful_client_connections=18,duration_us=0,destination_max_pending_reqs=0,destination_max_inflight_reqs=0,retrans_per_kbyte_max=0,cmd_get_count=0,cmd_delete_out=0,cmd_lease_get=0,cmd_set=0,cmd_get_out_all=0,cmd_get_out=0,cmd_lease_set_count=0,cmd_other_out_all=0,cmd_lease_get_out=0,cmd_set_count=0,cmd_lease_set_out=0,cmd_delete_count=0,cmd_other=0,cmd_delete=0,cmd_get=0,cmd_lease_set=0,cmd_set_out=0,cmd_lease_get_count=0,cmd_other_out=0,cmd_lease_get_out_all=0,cmd_set_out_all=0,cmd_other_count=0,cmd_delete_out_all=0,cmd_lease_set_out_all=0 1453831884664956455 | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,286 @@ | ||
package mcrouter | ||
|
||
import ( | ||
"bufio" | ||
"context" | ||
"fmt" | ||
"net" | ||
"net/url" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/internal" | ||
"github.com/influxdata/telegraf/plugins/inputs" | ||
) | ||
|
||
// Mcrouter is a mcrouter plugin | ||
type Mcrouter struct { | ||
Servers []string | ||
Timeout internal.Duration | ||
} | ||
|
||
// enum for statType | ||
type statType int | ||
|
||
const ( | ||
typeInt statType = iota | ||
typeFloat statType = iota | ||
) | ||
|
||
var sampleConfig = ` | ||
## An array of address to gather stats about. Specify an ip or hostname | ||
## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc. | ||
servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"] | ||
## Timeout for metric collections from all servers. Minimum timeout is "1s". | ||
# timeout = "5s" | ||
` | ||
|
||
var defaultTimeout = 5 * time.Second | ||
|
||
var defaultServerURL = url.URL{ | ||
Scheme: "tcp", | ||
Host: "localhost:11211", | ||
} | ||
|
||
// The list of metrics that should be sent | ||
var sendMetrics = map[string]statType{ | ||
"uptime": typeInt, | ||
"num_servers": typeInt, | ||
"num_servers_new": typeInt, | ||
"num_servers_up": typeInt, | ||
"num_servers_down": typeInt, | ||
"num_servers_closed": typeInt, | ||
"num_clients": typeInt, | ||
"num_suspect_servers": typeInt, | ||
"destination_batches_sum": typeInt, | ||
"destination_requests_sum": typeInt, | ||
"outstanding_route_get_reqs_queued": typeInt, | ||
"outstanding_route_update_reqs_queued": typeInt, | ||
"outstanding_route_get_avg_queue_size": typeInt, | ||
"outstanding_route_update_avg_queue_size": typeInt, | ||
"outstanding_route_get_avg_wait_time_sec": typeInt, | ||
"outstanding_route_update_avg_wait_time_sec": typeInt, | ||
"retrans_closed_connections": typeInt, | ||
"destination_pending_reqs": typeInt, | ||
"destination_inflight_reqs": typeInt, | ||
"destination_batch_size": typeInt, | ||
"asynclog_requests": typeInt, | ||
"proxy_reqs_processing": typeInt, | ||
"proxy_reqs_waiting": typeInt, | ||
"client_queue_notify_period": typeInt, | ||
"rusage_system": typeFloat, | ||
"rusage_user": typeFloat, | ||
"ps_num_minor_faults": typeInt, | ||
"ps_num_major_faults": typeInt, | ||
"ps_user_time_sec": typeFloat, | ||
"ps_system_time_sec": typeFloat, | ||
"ps_vsize": typeInt, | ||
"ps_rss": typeInt, | ||
"fibers_allocated": typeInt, | ||
"fibers_pool_size": typeInt, | ||
"fibers_stack_high_watermark": typeInt, | ||
"successful_client_connections": typeInt, | ||
"duration_us": typeInt, | ||
"destination_max_pending_reqs": typeInt, | ||
"destination_max_inflight_reqs": typeInt, | ||
"retrans_per_kbyte_max": typeInt, | ||
"cmd_get_count": typeInt, | ||
"cmd_delete_out": typeInt, | ||
"cmd_lease_get": typeInt, | ||
"cmd_set": typeInt, | ||
"cmd_get_out_all": typeInt, | ||
"cmd_get_out": typeInt, | ||
"cmd_lease_set_count": typeInt, | ||
"cmd_other_out_all": typeInt, | ||
"cmd_lease_get_out": typeInt, | ||
"cmd_set_count": typeInt, | ||
"cmd_lease_set_out": typeInt, | ||
"cmd_delete_count": typeInt, | ||
"cmd_other": typeInt, | ||
"cmd_delete": typeInt, | ||
"cmd_get": typeInt, | ||
"cmd_lease_set": typeInt, | ||
"cmd_set_out": typeInt, | ||
"cmd_lease_get_count": typeInt, | ||
"cmd_other_out": typeInt, | ||
"cmd_lease_get_out_all": typeInt, | ||
"cmd_set_out_all": typeInt, | ||
"cmd_other_count": typeInt, | ||
"cmd_delete_out_all": typeInt, | ||
"cmd_lease_set_out_all": typeInt, | ||
} | ||
|
||
// SampleConfig returns sample configuration message | ||
func (m *Mcrouter) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
// Description returns description of Mcrouter plugin | ||
func (m *Mcrouter) Description() string { | ||
return "Read metrics from one or many mcrouter servers" | ||
} | ||
|
||
// Gather reads stats from all configured servers accumulates stats | ||
func (m *Mcrouter) Gather(acc telegraf.Accumulator) error { | ||
ctx := context.Background() | ||
|
||
if m.Timeout.Duration < 1*time.Second { | ||
m.Timeout.Duration = defaultTimeout | ||
} | ||
|
||
ctx, cancel := context.WithTimeout(ctx, m.Timeout.Duration) | ||
defer cancel() | ||
|
||
if len(m.Servers) == 0 { | ||
m.Servers = []string{defaultServerURL.String()} | ||
} | ||
|
||
for _, serverAddress := range m.Servers { | ||
acc.AddError(m.gatherServer(ctx, serverAddress, acc)) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ParseAddress parses an address string into 'host:port' and 'protocol' parts | ||
func (m *Mcrouter) ParseAddress(address string) (string, string, error) { | ||
var protocol string | ||
var host string | ||
var port string | ||
|
||
u, parseError := url.Parse(address) | ||
|
||
if parseError != nil { | ||
return "", "", fmt.Errorf("Invalid server address") | ||
} | ||
|
||
if u.Scheme != "tcp" && u.Scheme != "unix" { | ||
return "", "", fmt.Errorf("Invalid server protocol") | ||
} | ||
|
||
protocol = u.Scheme | ||
|
||
if protocol == "unix" { | ||
if u.Path == "" { | ||
return "", "", fmt.Errorf("Invalid unix socket path") | ||
} | ||
|
||
address = u.Path | ||
} else { | ||
if u.Host == "" { | ||
return "", "", fmt.Errorf("Invalid host") | ||
} | ||
|
||
host = u.Hostname() | ||
port = u.Port() | ||
|
||
if host == "" { | ||
host = defaultServerURL.Hostname() | ||
} | ||
|
||
if port == "" { | ||
port = defaultServerURL.Port() | ||
} | ||
|
||
address = host + ":" + port | ||
} | ||
|
||
return address, protocol, nil | ||
} | ||
|
||
func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegraf.Accumulator) error { | ||
var conn net.Conn | ||
var err error | ||
var protocol string | ||
var dialer net.Dialer | ||
|
||
address, protocol, err = m.ParseAddress(address) | ||
|
||
conn, err = dialer.DialContext(ctx, protocol, address) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
defer conn.Close() | ||
|
||
// Extend connection | ||
deadline, ok := ctx.Deadline() | ||
|
||
if ok { | ||
conn.SetDeadline(deadline) | ||
} | ||
|
||
// Read and write buffer | ||
reader := bufio.NewReader(conn) | ||
scanner := bufio.NewScanner(reader) | ||
|
||
// Send command | ||
if _, err := fmt.Fprint(conn, "stats\r\n"); err != nil { | ||
return err | ||
} | ||
|
||
values, err := parseResponse(scanner) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
// Add server address as a tag | ||
tags := map[string]string{"server": address} | ||
|
||
// Process values | ||
fields := make(map[string]interface{}) | ||
for key, sType := range sendMetrics { | ||
if value, ok := values[key]; ok { | ||
switch sType { | ||
case typeInt: | ||
if v, errParse := strconv.ParseInt(value, 10, 64); errParse == nil { | ||
fields[key] = v | ||
} | ||
case typeFloat: | ||
if v, errParse := strconv.ParseFloat(value, 64); errParse == nil { | ||
fields[key] = v | ||
} | ||
default: | ||
} | ||
} | ||
} | ||
acc.AddFields("mcrouter", fields, tags) | ||
return nil | ||
} | ||
|
||
func parseResponse(r *bufio.Scanner) (map[string]string, error) { | ||
values := make(map[string]string) | ||
|
||
for r.Scan() { | ||
// Read line | ||
line := r.Text() | ||
|
||
// Done | ||
if line == "END" { | ||
break | ||
} | ||
|
||
// Read values | ||
s := strings.SplitN(line, " ", 3) | ||
|
||
if len(s) != 3 || s[0] != "STAT" { | ||
return nil, fmt.Errorf("unexpected line in stats response: %s", line) | ||
} | ||
|
||
// Save values | ||
values[s[1]] = s[2] | ||
} | ||
|
||
return values, nil | ||
} | ||
|
||
func init() { | ||
inputs.Add("mcrouter", func() telegraf.Input { | ||
return &Mcrouter{} | ||
}) | ||
} |
Oops, something went wrong.