diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 440d1e9a563ef..f4a630ecc010c 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -50,6 +50,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/logparser" _ "github.com/influxdata/telegraf/plugins/inputs/lustre2" _ "github.com/influxdata/telegraf/plugins/inputs/mailchimp" + _ "github.com/influxdata/telegraf/plugins/inputs/mcrouter" _ "github.com/influxdata/telegraf/plugins/inputs/memcached" _ "github.com/influxdata/telegraf/plugins/inputs/mesos" _ "github.com/influxdata/telegraf/plugins/inputs/minecraft" diff --git a/plugins/inputs/mcrouter/README.md b/plugins/inputs/mcrouter/README.md new file mode 100644 index 0000000000000..05c2597869e05 --- /dev/null +++ b/plugins/inputs/mcrouter/README.md @@ -0,0 +1,103 @@ +# Mcrouter Input Plugin + +This plugin gathers statistics data from a Mcrouter server. + +### Configuration: + +```toml +# Read metrics from one or many mcrouter servers. +[[inputs.mcrouter]] + ## An array of address to gather stats about. Specify an ip or hostname + ## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc. + servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"] + + ## Timeout for metric collections from all servers. Minimum timeout is "1s". + # timeout = "5s" +``` + +### Measurements & Fields: + +The fields from this plugin are gathered in the *mcrouter* measurement. + +Description of gathered fields can be found [here](https://github.com/facebook/mcrouter/wiki/Stats-list). + +Fields: + +* uptime +* num_servers +* num_servers_new +* num_servers_up +* num_servers_down +* num_servers_closed +* num_clients +* num_suspect_servers +* destination_batches_sum +* destination_requests_sum +* outstanding_route_get_reqs_queued +* outstanding_route_update_reqs_queued +* outstanding_route_get_avg_queue_size +* outstanding_route_update_avg_queue_size +* outstanding_route_get_avg_wait_time_sec +* outstanding_route_update_avg_wait_time_sec +* retrans_closed_connections +* destination_pending_reqs +* destination_inflight_reqs +* destination_batch_size +* asynclog_requests +* proxy_reqs_processing +* proxy_reqs_waiting +* client_queue_notify_period +* rusage_system +* rusage_user +* ps_num_minor_faults +* ps_num_major_faults +* ps_user_time_sec +* ps_system_time_sec +* ps_vsize +* ps_rss +* fibers_allocated +* fibers_pool_size +* fibers_stack_high_watermark +* successful_client_connections +* duration_us +* destination_max_pending_reqs +* destination_max_inflight_reqs +* retrans_per_kbyte_max +* cmd_get_count +* cmd_delete_out +* cmd_lease_get +* cmd_set +* cmd_get_out_all +* cmd_get_out +* cmd_lease_set_count +* cmd_other_out_all +* cmd_lease_get_out +* cmd_set_count +* cmd_lease_set_out +* cmd_delete_count +* cmd_other +* cmd_delete +* cmd_get +* cmd_lease_set +* cmd_set_out +* cmd_lease_get_count +* cmd_other_out +* cmd_lease_get_out_all +* cmd_set_out_all +* cmd_other_count +* cmd_delete_out_all +* cmd_lease_set_out_all + +### Tags: + +* Mcrouter measurements have the following tags: + - server (the host name from which metrics are gathered) + + + +### Example Output: + +``` +$ ./telegraf --config telegraf.conf --input-filter mcrouter --test +mcrouter,server=localhost:11211 uptime=166,num_servers=1,num_servers_new=1,num_servers_up=0,num_servers_down=0,num_servers_closed=0,num_clients=1,num_suspect_servers=0,destination_batches_sum=0,destination_requests_sum=0,outstanding_route_get_reqs_queued=0,outstanding_route_update_reqs_queued=0,outstanding_route_get_avg_queue_size=0,outstanding_route_update_avg_queue_size=0,outstanding_route_get_avg_wait_time_sec=0,outstanding_route_update_avg_wait_time_sec=0,retrans_closed_connections=0,destination_pending_reqs=0,destination_inflight_reqs=0,destination_batch_size=0,asynclog_requests=0,proxy_reqs_processing=1,proxy_reqs_waiting=0,client_queue_notify_period=0,rusage_system=0.040966,rusage_user=0.020483,ps_num_minor_faults=2490,ps_num_major_faults=11,ps_user_time_sec=0.02,ps_system_time_sec=0.04,ps_vsize=697741312,ps_rss=10563584,fibers_allocated=0,fibers_pool_size=0,fibers_stack_high_watermark=0,successful_client_connections=18,duration_us=0,destination_max_pending_reqs=0,destination_max_inflight_reqs=0,retrans_per_kbyte_max=0,cmd_get_count=0,cmd_delete_out=0,cmd_lease_get=0,cmd_set=0,cmd_get_out_all=0,cmd_get_out=0,cmd_lease_set_count=0,cmd_other_out_all=0,cmd_lease_get_out=0,cmd_set_count=0,cmd_lease_set_out=0,cmd_delete_count=0,cmd_other=0,cmd_delete=0,cmd_get=0,cmd_lease_set=0,cmd_set_out=0,cmd_lease_get_count=0,cmd_other_out=0,cmd_lease_get_out_all=0,cmd_set_out_all=0,cmd_other_count=0,cmd_delete_out_all=0,cmd_lease_set_out_all=0 1453831884664956455 +``` diff --git a/plugins/inputs/mcrouter/mcrouter.go b/plugins/inputs/mcrouter/mcrouter.go new file mode 100644 index 0000000000000..1ae5c79ea7c7e --- /dev/null +++ b/plugins/inputs/mcrouter/mcrouter.go @@ -0,0 +1,286 @@ +package mcrouter + +import ( + "bufio" + "context" + "fmt" + "net" + "net/url" + "strconv" + "strings" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/internal" + "github.com/influxdata/telegraf/plugins/inputs" +) + +// Mcrouter is a mcrouter plugin +type Mcrouter struct { + Servers []string + Timeout internal.Duration +} + +// enum for statType +type statType int + +const ( + typeInt statType = iota + typeFloat statType = iota +) + +var sampleConfig = ` + ## An array of address to gather stats about. Specify an ip or hostname + ## with port. ie tcp://localhost:11211, tcp://10.0.0.1:11211, etc. + servers = ["tcp://localhost:11211", "unix:///var/run/mcrouter.sock"] + + ## Timeout for metric collections from all servers. Minimum timeout is "1s". + # timeout = "5s" +` + +var defaultTimeout = 5 * time.Second + +var defaultServerURL = url.URL{ + Scheme: "tcp", + Host: "localhost:11211", +} + +// The list of metrics that should be sent +var sendMetrics = map[string]statType{ + "uptime": typeInt, + "num_servers": typeInt, + "num_servers_new": typeInt, + "num_servers_up": typeInt, + "num_servers_down": typeInt, + "num_servers_closed": typeInt, + "num_clients": typeInt, + "num_suspect_servers": typeInt, + "destination_batches_sum": typeInt, + "destination_requests_sum": typeInt, + "outstanding_route_get_reqs_queued": typeInt, + "outstanding_route_update_reqs_queued": typeInt, + "outstanding_route_get_avg_queue_size": typeInt, + "outstanding_route_update_avg_queue_size": typeInt, + "outstanding_route_get_avg_wait_time_sec": typeInt, + "outstanding_route_update_avg_wait_time_sec": typeInt, + "retrans_closed_connections": typeInt, + "destination_pending_reqs": typeInt, + "destination_inflight_reqs": typeInt, + "destination_batch_size": typeInt, + "asynclog_requests": typeInt, + "proxy_reqs_processing": typeInt, + "proxy_reqs_waiting": typeInt, + "client_queue_notify_period": typeInt, + "rusage_system": typeFloat, + "rusage_user": typeFloat, + "ps_num_minor_faults": typeInt, + "ps_num_major_faults": typeInt, + "ps_user_time_sec": typeFloat, + "ps_system_time_sec": typeFloat, + "ps_vsize": typeInt, + "ps_rss": typeInt, + "fibers_allocated": typeInt, + "fibers_pool_size": typeInt, + "fibers_stack_high_watermark": typeInt, + "successful_client_connections": typeInt, + "duration_us": typeInt, + "destination_max_pending_reqs": typeInt, + "destination_max_inflight_reqs": typeInt, + "retrans_per_kbyte_max": typeInt, + "cmd_get_count": typeInt, + "cmd_delete_out": typeInt, + "cmd_lease_get": typeInt, + "cmd_set": typeInt, + "cmd_get_out_all": typeInt, + "cmd_get_out": typeInt, + "cmd_lease_set_count": typeInt, + "cmd_other_out_all": typeInt, + "cmd_lease_get_out": typeInt, + "cmd_set_count": typeInt, + "cmd_lease_set_out": typeInt, + "cmd_delete_count": typeInt, + "cmd_other": typeInt, + "cmd_delete": typeInt, + "cmd_get": typeInt, + "cmd_lease_set": typeInt, + "cmd_set_out": typeInt, + "cmd_lease_get_count": typeInt, + "cmd_other_out": typeInt, + "cmd_lease_get_out_all": typeInt, + "cmd_set_out_all": typeInt, + "cmd_other_count": typeInt, + "cmd_delete_out_all": typeInt, + "cmd_lease_set_out_all": typeInt, +} + +// SampleConfig returns sample configuration message +func (m *Mcrouter) SampleConfig() string { + return sampleConfig +} + +// Description returns description of Mcrouter plugin +func (m *Mcrouter) Description() string { + return "Read metrics from one or many mcrouter servers" +} + +// Gather reads stats from all configured servers accumulates stats +func (m *Mcrouter) Gather(acc telegraf.Accumulator) error { + ctx := context.Background() + + if m.Timeout.Duration < 1*time.Second { + m.Timeout.Duration = defaultTimeout + } + + ctx, cancel := context.WithTimeout(ctx, m.Timeout.Duration) + defer cancel() + + if len(m.Servers) == 0 { + m.Servers = []string{defaultServerURL.String()} + } + + for _, serverAddress := range m.Servers { + acc.AddError(m.gatherServer(ctx, serverAddress, acc)) + } + + return nil +} + +// ParseAddress parses an address string into 'host:port' and 'protocol' parts +func (m *Mcrouter) ParseAddress(address string) (string, string, error) { + var protocol string + var host string + var port string + + u, parseError := url.Parse(address) + + if parseError != nil { + return "", "", fmt.Errorf("Invalid server address") + } + + if u.Scheme != "tcp" && u.Scheme != "unix" { + return "", "", fmt.Errorf("Invalid server protocol") + } + + protocol = u.Scheme + + if protocol == "unix" { + if u.Path == "" { + return "", "", fmt.Errorf("Invalid unix socket path") + } + + address = u.Path + } else { + if u.Host == "" { + return "", "", fmt.Errorf("Invalid host") + } + + host = u.Hostname() + port = u.Port() + + if host == "" { + host = defaultServerURL.Hostname() + } + + if port == "" { + port = defaultServerURL.Port() + } + + address = host + ":" + port + } + + return address, protocol, nil +} + +func (m *Mcrouter) gatherServer(ctx context.Context, address string, acc telegraf.Accumulator) error { + var conn net.Conn + var err error + var protocol string + var dialer net.Dialer + + address, protocol, err = m.ParseAddress(address) + + conn, err = dialer.DialContext(ctx, protocol, address) + + if err != nil { + return err + } + + defer conn.Close() + + // Extend connection + deadline, ok := ctx.Deadline() + + if ok { + conn.SetDeadline(deadline) + } + + // Read and write buffer + reader := bufio.NewReader(conn) + scanner := bufio.NewScanner(reader) + + // Send command + if _, err := fmt.Fprint(conn, "stats\r\n"); err != nil { + return err + } + + values, err := parseResponse(scanner) + + if err != nil { + return err + } + + // Add server address as a tag + tags := map[string]string{"server": address} + + // Process values + fields := make(map[string]interface{}) + for key, sType := range sendMetrics { + if value, ok := values[key]; ok { + switch sType { + case typeInt: + if v, errParse := strconv.ParseInt(value, 10, 64); errParse == nil { + fields[key] = v + } + case typeFloat: + if v, errParse := strconv.ParseFloat(value, 64); errParse == nil { + fields[key] = v + } + default: + } + } + } + acc.AddFields("mcrouter", fields, tags) + return nil +} + +func parseResponse(r *bufio.Scanner) (map[string]string, error) { + values := make(map[string]string) + + for r.Scan() { + // Read line + line := r.Text() + + // Done + if line == "END" { + break + } + + // Read values + s := strings.SplitN(line, " ", 3) + + if len(s) != 3 || s[0] != "STAT" { + return nil, fmt.Errorf("unexpected line in stats response: %s", line) + } + + // Save values + values[s[1]] = s[2] + } + + return values, nil +} + +func init() { + inputs.Add("mcrouter", func() telegraf.Input { + return &Mcrouter{} + }) +} diff --git a/plugins/inputs/mcrouter/mcrouter_test.go b/plugins/inputs/mcrouter/mcrouter_test.go new file mode 100644 index 0000000000000..e17c13b6d6655 --- /dev/null +++ b/plugins/inputs/mcrouter/mcrouter_test.go @@ -0,0 +1,250 @@ +package mcrouter + +import ( + "bufio" + "strings" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestAddressParsing(t *testing.T) { + m := &Mcrouter{ + Servers: []string{"tcp://" + testutil.GetLocalHost()}, + } + + var acceptTests = [][3]string{ + {"tcp://localhost:8086", "localhost:8086", "tcp"}, + {"tcp://localhost", "localhost:" + defaultServerURL.Port(), "tcp"}, + {"tcp://localhost:", "localhost:" + defaultServerURL.Port(), "tcp"}, + {"tcp://:8086", defaultServerURL.Hostname() + ":8086", "tcp"}, + {"tcp://:", defaultServerURL.Host, "tcp"}, + } + + var rejectTests = []string{ + "tcp://", + } + + for _, args := range acceptTests { + address, protocol, err := m.ParseAddress(args[0]) + + assert.Nil(t, err, args[0]) + assert.True(t, address == args[1], args[0]) + assert.True(t, protocol == args[2], args[0]) + } + + for _, addr := range rejectTests { + address, protocol, err := m.ParseAddress(addr) + + assert.NotNil(t, err, addr) + assert.Empty(t, address, addr) + assert.Empty(t, protocol, addr) + } +} + +func TestMcrouterGeneratesMetrics(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + m := &Mcrouter{ + Servers: []string{"tcp://" + testutil.GetLocalHost()}, + } + + var acc testutil.Accumulator + + err := acc.GatherError(m.Gather) + require.NoError(t, err) + + intMetrics := []string{"uptime", "num_servers", "num_servers_new", "num_servers_up", + "num_servers_down", "num_servers_closed", "num_clients", + "num_suspect_servers", "destination_batches_sum", "destination_requests_sum", + "outstanding_route_get_reqs_queued", "outstanding_route_update_reqs_queued", + "outstanding_route_get_avg_queue_size", "outstanding_route_update_avg_queue_size", + "outstanding_route_get_avg_wait_time_sec", "outstanding_route_update_avg_wait_time_sec", + "retrans_closed_connections", "destination_pending_reqs", "destination_inflight_reqs", + "destination_batch_size", "asynclog_requests", "proxy_reqs_processing", + "proxy_reqs_waiting", "client_queue_notify_period", + "ps_num_minor_faults", "ps_num_major_faults", + "ps_vsize", "ps_rss", "fibers_allocated", "fibers_pool_size", "fibers_stack_high_watermark", + "successful_client_connections", "duration_us", "destination_max_pending_reqs", + "destination_max_inflight_reqs", "retrans_per_kbyte_max", "cmd_get_count", "cmd_delete_out", + "cmd_lease_get", "cmd_set", "cmd_get_out_all", "cmd_get_out", "cmd_lease_set_count", + "cmd_other_out_all", "cmd_lease_get_out", "cmd_set_count", "cmd_lease_set_out", + "cmd_delete_count", "cmd_other", "cmd_delete", "cmd_get", "cmd_lease_set", "cmd_set_out", + "cmd_lease_get_count", "cmd_other_out", "cmd_lease_get_out_all", "cmd_set_out_all", + "cmd_other_count", "cmd_delete_out_all", "cmd_lease_set_out_all"} + + floatMetrics := []string{"rusage_system", "rusage_user", "ps_user_time_sec", "ps_system_time_sec"} + + for _, metric := range intMetrics { + assert.True(t, acc.HasInt64Field("mcrouter", metric), metric) + } + + for _, metric := range floatMetrics { + assert.True(t, acc.HasFloatField("mcrouter", metric), metric) + } +} + +func TestMcrouterParseMetrics(t *testing.T) { + r := bufio.NewReader(strings.NewReader(mcrouterStats)) + scanner := bufio.NewScanner(r) + values, err := parseResponse(scanner) + require.NoError(t, err, "Error parsing mcrouter response") + + tests := []struct { + key string + value string + }{ + {"uptime", "166"}, + {"num_servers", "1"}, + {"num_servers_new", "1"}, + {"num_servers_up", "0"}, + {"num_servers_down", "0"}, + {"num_servers_closed", "0"}, + {"num_clients", "1"}, + {"num_suspect_servers", "0"}, + {"destination_batches_sum", "0"}, + {"destination_requests_sum", "0"}, + {"outstanding_route_get_reqs_queued", "0"}, + {"outstanding_route_update_reqs_queued", "0"}, + {"outstanding_route_get_avg_queue_size", "0"}, + {"outstanding_route_update_avg_queue_size", "0"}, + {"outstanding_route_get_avg_wait_time_sec", "0"}, + {"outstanding_route_update_avg_wait_time_sec", "0"}, + {"retrans_closed_connections", "0"}, + {"destination_pending_reqs", "0"}, + {"destination_inflight_reqs", "0"}, + {"destination_batch_size", "0"}, + {"asynclog_requests", "0"}, + {"proxy_reqs_processing", "1"}, + {"proxy_reqs_waiting", "0"}, + {"client_queue_notify_period", "0"}, + {"rusage_system", "0.040966"}, + {"rusage_user", "0.020483"}, + {"ps_num_minor_faults", "2490"}, + {"ps_num_major_faults", "11"}, + {"ps_user_time_sec", "0.02"}, + {"ps_system_time_sec", "0.04"}, + {"ps_vsize", "697741312"}, + {"ps_rss", "10563584"}, + {"fibers_allocated", "0"}, + {"fibers_pool_size", "0"}, + {"fibers_stack_high_watermark", "0"}, + {"successful_client_connections", "18"}, + {"duration_us", "0"}, + {"destination_max_pending_reqs", "0"}, + {"destination_max_inflight_reqs", "0"}, + {"retrans_per_kbyte_max", "0"}, + {"cmd_get_count", "0"}, + {"cmd_delete_out", "0"}, + {"cmd_lease_get", "0"}, + {"cmd_set", "0"}, + {"cmd_get_out_all", "0"}, + {"cmd_get_out", "0"}, + {"cmd_lease_set_count", "0"}, + {"cmd_other_out_all", "0"}, + {"cmd_lease_get_out", "0"}, + {"cmd_set_count", "0"}, + {"cmd_lease_set_out", "0"}, + {"cmd_delete_count", "0"}, + {"cmd_other", "0"}, + {"cmd_delete", "0"}, + {"cmd_get", "0"}, + {"cmd_lease_set", "0"}, + {"cmd_set_out", "0"}, + {"cmd_lease_get_count", "0"}, + {"cmd_other_out", "0"}, + {"cmd_lease_get_out_all", "0"}, + {"cmd_set_out_all", "0"}, + {"cmd_other_count", "0"}, + {"cmd_delete_out_all", "0"}, + {"cmd_lease_set_out_all", "0"}, + } + + for _, test := range tests { + value, ok := values[test.key] + if !ok { + t.Errorf("Did not find key for metric %s in values", test.key) + continue + } + if value != test.value { + t.Errorf("Metric: %s, Expected: %s, actual: %s", + test.key, test.value, value) + } + } +} + +var mcrouterStats = `STAT version 36.0.0 mcrouter +STAT commandargs --port 11211 --config-file /etc/mcrouter/mcrouter.json --async-dir /var/spool/mcrouter --log-path /var/log/mcrouter/mcrouter.log --stats-root /var/mcrouter/stats --server-timeout 100 --reset-inactive-connection-interval 10000 --proxy-threads auto +STAT pid 21357 +STAT parent_pid 1 +STAT time 1524673265 +STAT uptime 166 +STAT num_servers 1 +STAT num_servers_new 1 +STAT num_servers_up 0 +STAT num_servers_down 0 +STAT num_servers_closed 0 +STAT num_clients 1 +STAT num_suspect_servers 0 +STAT destination_batches_sum 0 +STAT destination_requests_sum 0 +STAT outstanding_route_get_reqs_queued 0 +STAT outstanding_route_update_reqs_queued 0 +STAT outstanding_route_get_avg_queue_size 0 +STAT outstanding_route_update_avg_queue_size 0 +STAT outstanding_route_get_avg_wait_time_sec 0 +STAT outstanding_route_update_avg_wait_time_sec 0 +STAT retrans_closed_connections 0 +STAT destination_pending_reqs 0 +STAT destination_inflight_reqs 0 +STAT destination_batch_size 0 +STAT asynclog_requests 0 +STAT proxy_reqs_processing 1 +STAT proxy_reqs_waiting 0 +STAT client_queue_notify_period 0 +STAT rusage_system 0.040966 +STAT rusage_user 0.020483 +STAT ps_num_minor_faults 2490 +STAT ps_num_major_faults 11 +STAT ps_user_time_sec 0.02 +STAT ps_system_time_sec 0.04 +STAT ps_vsize 697741312 +STAT ps_rss 10563584 +STAT fibers_allocated 0 +STAT fibers_pool_size 0 +STAT fibers_stack_high_watermark 0 +STAT successful_client_connections 18 +STAT duration_us 0 +STAT destination_max_pending_reqs 0 +STAT destination_max_inflight_reqs 0 +STAT retrans_per_kbyte_max 0 +STAT cmd_get_count 0 +STAT cmd_delete_out 0 +STAT cmd_lease_get 0 +STAT cmd_set 0 +STAT cmd_get_out_all 0 +STAT cmd_get_out 0 +STAT cmd_lease_set_count 0 +STAT cmd_other_out_all 0 +STAT cmd_lease_get_out 0 +STAT cmd_set_count 0 +STAT cmd_lease_set_out 0 +STAT cmd_delete_count 0 +STAT cmd_other 0 +STAT cmd_delete 0 +STAT cmd_get 0 +STAT cmd_lease_set 0 +STAT cmd_set_out 0 +STAT cmd_lease_get_count 0 +STAT cmd_other_out 0 +STAT cmd_lease_get_out_all 0 +STAT cmd_set_out_all 0 +STAT cmd_other_count 0 +STAT cmd_delete_out_all 0 +STAT cmd_lease_set_out_all 0 +END +`