diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6876cfa7bd646..16749fcbc54eb 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -326,7 +326,7 @@ which would take some time to replicate. To overcome this situation we've decided to use docker containers to provide a fast and reproducible environment to test those services which require it. For other situations -(i.e: https://github.com/influxdata/telegraf/blob/master/plugins/redis/redis_test.go) +(i.e: https://github.com/influxdata/telegraf/blob/master/plugins/inputs/redis/redis_test.go) a simple mock will suffice. To execute Telegraf tests follow these simple steps: diff --git a/README.md b/README.md index 21a3445ea047e..c38890350cfdf 100644 --- a/README.md +++ b/README.md @@ -182,6 +182,7 @@ Currently implemented sources: * prometheus * puppetagent * rabbitmq +* raindrops * redis * rethinkdb * sql server (microsoft) diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index 335d41a329dec..639afbe099a8c 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -35,6 +35,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/prometheus" _ "github.com/influxdata/telegraf/plugins/inputs/puppetagent" _ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq" + _ "github.com/influxdata/telegraf/plugins/inputs/raindrops" _ "github.com/influxdata/telegraf/plugins/inputs/redis" _ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb" _ "github.com/influxdata/telegraf/plugins/inputs/sensors" diff --git a/plugins/inputs/raindrops/README.md b/plugins/inputs/raindrops/README.md new file mode 100644 index 0000000000000..6a73a085b62e1 --- /dev/null +++ b/plugins/inputs/raindrops/README.md @@ -0,0 +1,49 @@ +# Raindrops Input Plugin + +The [raindrops](http://raindrops.bogomips.org/) plugin reads from +specified raindops [middleware](http://raindrops.bogomips.org/Raindrops/Middleware.html) URI and adds stats to InfluxDB. + +### Configuration: + +```toml +# Read raindrops stats +[[inputs.raindrops]] + urls = ["http://localhost:8080/_raindrops"] +``` + +### Measurements & Fields: + +- raindrops + - calling (integer, count) + - writing (integer, count) +- raindrops_listen + - active (integer, bytes) + - queued (integer, bytes) + +### Tags: + +- Raindops calling/writing of all the workers: + - server + - port + +- raindrops_listen (ip:port): + - ip + - port + +- raindrops_listen (Unix Socket): + - socket + +### Example Output: + +``` +$ ./telegraf -config telegraf.conf -input-filter raindrops -test +* Plugin: raindrops, Collection 1 +> raindrops,port=8080,server=localhost calling=0i,writing=0i 1455479896806238204 +> raindrops_listen,ip=0.0.0.0,port=8080 active=0i,queued=0i 1455479896806561938 +> raindrops_listen,ip=0.0.0.0,port=8081 active=1i,queued=0i 1455479896806605749 +> raindrops_listen,ip=127.0.0.1,port=8082 active=0i,queued=0i 1455479896806646315 +> raindrops_listen,ip=0.0.0.0,port=8083 active=0i,queued=0i 1455479896806683252 +> raindrops_listen,ip=0.0.0.0,port=8084 active=0i,queued=0i 1455479896806712025 +> raindrops_listen,ip=0.0.0.0,port=3000 active=0i,queued=0i 1455479896806779197 +> raindrops_listen,socket=/tmp/listen.me active=0i,queued=0i 1455479896806813907 +``` diff --git a/plugins/inputs/raindrops/raindrops.go b/plugins/inputs/raindrops/raindrops.go new file mode 100644 index 0000000000000..572422f59dc7a --- /dev/null +++ b/plugins/inputs/raindrops/raindrops.go @@ -0,0 +1,184 @@ +package raindrops + +import ( + "bufio" + "fmt" + "net" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +type Raindrops struct { + Urls []string + http_client *http.Client +} + +var sampleConfig = ` + ### An array of raindrops middleware URI to gather stats. + urls = ["http://localhost:8080/_raindrops"] +` + +func (r *Raindrops) SampleConfig() string { + return sampleConfig +} + +func (r *Raindrops) Description() string { + return "Read raindrops stats (raindrops - real-time stats for preforking Rack servers)" +} + +func (r *Raindrops) Gather(acc telegraf.Accumulator) error { + var wg sync.WaitGroup + var outerr error + + for _, u := range r.Urls { + addr, err := url.Parse(u) + if err != nil { + return fmt.Errorf("Unable to parse address '%s': %s", u, err) + } + + wg.Add(1) + go func(addr *url.URL) { + defer wg.Done() + outerr = r.gatherUrl(addr, acc) + }(addr) + } + + wg.Wait() + + return outerr +} + +func (r *Raindrops) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error { + resp, err := r.http_client.Get(addr.String()) + if err != nil { + return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status) + } + buf := bufio.NewReader(resp.Body) + + // Calling + _, err = buf.ReadString(':') + if err != nil { + return err + } + line, err := buf.ReadString('\n') + if err != nil { + return err + } + calling, err := strconv.ParseUint(strings.TrimSpace(line), 10, 64) + if err != nil { + return err + } + + // Writing + _, err = buf.ReadString(':') + if err != nil { + return err + } + line, err = buf.ReadString('\n') + if err != nil { + return err + } + writing, err := strconv.ParseUint(strings.TrimSpace(line), 10, 64) + if err != nil { + return err + } + tags := r.getTags(addr) + fields := map[string]interface{}{ + "calling": calling, + "writing": writing, + } + acc.AddFields("raindrops", fields, tags) + + iterate := true + var queued_line_str string + var active_line_str string + var active_err error + var queued_err error + + for iterate { + // Listen + var tags map[string]string + + lis := map[string]interface{}{ + "active": 0, + "queued": 0, + } + active_line_str, active_err = buf.ReadString('\n') + if active_err != nil { + iterate = false + break + } + if strings.Compare(active_line_str, "\n") == 0 { + break + } + queued_line_str, queued_err = buf.ReadString('\n') + if queued_err != nil { + iterate = false + } + active_line := strings.Split(active_line_str, " ") + listen_name := active_line[0] + + active, err := strconv.ParseUint(strings.TrimSpace(active_line[2]), 10, 64) + if err != nil { + active = 0 + } + lis["active"] = active + + queued_line := strings.Split(queued_line_str, " ") + queued, err := strconv.ParseUint(strings.TrimSpace(queued_line[2]), 10, 64) + if err != nil { + queued = 0 + } + lis["queued"] = queued + if strings.Contains(listen_name, ":") { + listener := strings.Split(listen_name, ":") + tags = map[string]string{ + "ip": listener[0], + "port": listener[1], + } + + } else { + tags = map[string]string{ + "socket": listen_name, + } + } + acc.AddFields("raindrops_listen", lis, tags) + } + return nil +} + +// Get tag(s) for the raindrops calling/writing plugin +func (r *Raindrops) getTags(addr *url.URL) map[string]string { + h := addr.Host + host, port, err := net.SplitHostPort(h) + if err != nil { + host = addr.Host + if addr.Scheme == "http" { + port = "80" + } else if addr.Scheme == "https" { + port = "443" + } else { + port = "" + } + } + return map[string]string{"server": host, "port": port} +} + +func init() { + inputs.Add("raindrops", func() telegraf.Input { + return &Raindrops{http_client: &http.Client{Transport: &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + }}} + }) +} diff --git a/plugins/inputs/raindrops/raindrops_test.go b/plugins/inputs/raindrops/raindrops_test.go new file mode 100644 index 0000000000000..0dee9b1cc95ad --- /dev/null +++ b/plugins/inputs/raindrops/raindrops_test.go @@ -0,0 +1,107 @@ +package raindrops + +import ( + "fmt" + "net" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/influxdata/telegraf/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "time" +) + +const sampleResponse = ` +calling: 100 +writing: 200 +0.0.0.0:8080 active: 1 +0.0.0.0:8080 queued: 2 +0.0.0.0:8081 active: 3 +0.0.0.0:8081 queued: 4 +127.0.0.1:8082 active: 5 +127.0.0.1:8082 queued: 6 +0.0.0.0:8083 active: 7 +0.0.0.0:8083 queued: 8 +0.0.0.0:8084 active: 9 +0.0.0.0:8084 queued: 10 +0.0.0.0:3000 active: 11 +0.0.0.0:3000 queued: 12 +/tmp/listen.me active: 13 +/tmp/listen.me queued: 14 +` + +// Verify that raindrops tags are properly parsed based on the server +func TestRaindropsTags(t *testing.T) { + urls := []string{"http://localhost/_raindrops", "http://localhost:80/_raindrops"} + var addr *url.URL + r := &Raindrops{} + for _, url1 := range urls { + addr, _ = url.Parse(url1) + tagMap := r.getTags(addr) + assert.Contains(t, tagMap["server"], "localhost") + } +} + +func TestRaindropsGeneratesMetrics(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rsp string + + if r.URL.Path == "/_raindrops" { + rsp = sampleResponse + } else { + panic("Cannot handle request") + } + + fmt.Fprintln(w, rsp) + })) + defer ts.Close() + + n := &Raindrops{ + Urls: []string{fmt.Sprintf("%s/_raindrops", ts.URL)}, + http_client: &http.Client{Transport: &http.Transport{ + ResponseHeaderTimeout: time.Duration(3 * time.Second), + }}, + } + + var acc testutil.Accumulator + + err := n.Gather(&acc) + require.NoError(t, err) + + fields := map[string]interface{}{ + "calling": uint64(100), + "writing": uint64(200), + } + addr, err := url.Parse(ts.URL) + if err != nil { + panic(err) + } + + host, port, err := net.SplitHostPort(addr.Host) + if err != nil { + host = addr.Host + if addr.Scheme == "http" { + port = "80" + } else if addr.Scheme == "https" { + port = "443" + } else { + port = "" + } + } + + tags := map[string]string{"server": host, "port": port} + acc.AssertContainsTaggedFields(t, "raindrops", fields, tags) + + tags = map[string]string{ + "port": "8081", + "ip": "0.0.0.0", + } + fields = map[string]interface{}{ + "active": uint64(3), + "queued": uint64(4), + } + acc.AssertContainsTaggedFields(t, "raindrops_listen", fields, tags) +}