Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raindrops input for Unicorn server monitoring #695

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ which would take some time to replicate.
To overcome this situation we've decided to use docker containers to provide a
fast and reproducible environment to test those services which require it.
For other situations
(i.e: https://github.com/influxdata/telegraf/blob/master/plugins/redis/redis_test.go)
(i.e: https://github.com/influxdata/telegraf/blob/master/plugins/inputs/redis/redis_test.go)
a simple mock will suffice.

To execute Telegraf tests follow these simple steps:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ Currently implemented sources:
* prometheus
* puppetagent
* rabbitmq
* raindrops
* redis
* rethinkdb
* sql server (microsoft)
Expand Down
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/prometheus"
_ "github.com/influxdata/telegraf/plugins/inputs/puppetagent"
_ "github.com/influxdata/telegraf/plugins/inputs/rabbitmq"
_ "github.com/influxdata/telegraf/plugins/inputs/raindrops"
_ "github.com/influxdata/telegraf/plugins/inputs/redis"
_ "github.com/influxdata/telegraf/plugins/inputs/rethinkdb"
_ "github.com/influxdata/telegraf/plugins/inputs/sensors"
Expand Down
49 changes: 49 additions & 0 deletions plugins/inputs/raindrops/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Raindrops Input Plugin

The [raindrops](http://raindrops.bogomips.org/) plugin reads from
specified raindops [middleware](http://raindrops.bogomips.org/Raindrops/Middleware.html) URI and adds stats to InfluxDB.

### Configuration:

```toml
# Read raindrops stats
[[inputs.raindrops]]
urls = ["http://localhost:8080/_raindrops"]
```

### Measurements & Fields:

- raindrops
- calling (integer, count)
- writing (integer, count)
- raindrops_listen
- active (integer, bytes)
- queued (integer, bytes)

### Tags:

- Raindops calling/writing of all the workers:
- server
- port

- raindrops_listen (ip:port):
- ip
- port

- raindrops_listen (Unix Socket):
- socket

### Example Output:

```
$ ./telegraf -config telegraf.conf -input-filter raindrops -test
* Plugin: raindrops, Collection 1
> raindrops,port=8080,server=localhost calling=0i,writing=0i 1455479896806238204
> raindrops_listen,ip=0.0.0.0,port=8080 active=0i,queued=0i 1455479896806561938
> raindrops_listen,ip=0.0.0.0,port=8081 active=1i,queued=0i 1455479896806605749
> raindrops_listen,ip=127.0.0.1,port=8082 active=0i,queued=0i 1455479896806646315
> raindrops_listen,ip=0.0.0.0,port=8083 active=0i,queued=0i 1455479896806683252
> raindrops_listen,ip=0.0.0.0,port=8084 active=0i,queued=0i 1455479896806712025
> raindrops_listen,ip=0.0.0.0,port=3000 active=0i,queued=0i 1455479896806779197
> raindrops_listen,socket=/tmp/listen.me active=0i,queued=0i 1455479896806813907
```
184 changes: 184 additions & 0 deletions plugins/inputs/raindrops/raindrops.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package raindrops

import (
"bufio"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

type Raindrops struct {
Urls []string
http_client *http.Client
}

var sampleConfig = `
### An array of raindrops middleware URI to gather stats.
urls = ["http://localhost:8080/_raindrops"]
`

func (r *Raindrops) SampleConfig() string {
return sampleConfig
}

func (r *Raindrops) Description() string {
return "Read raindrops stats (raindrops - real-time stats for preforking Rack servers)"
}

func (r *Raindrops) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
var outerr error

for _, u := range r.Urls {
addr, err := url.Parse(u)
if err != nil {
return fmt.Errorf("Unable to parse address '%s': %s", u, err)
}

wg.Add(1)
go func(addr *url.URL) {
defer wg.Done()
outerr = r.gatherUrl(addr, acc)
}(addr)
}

wg.Wait()

return outerr
}

func (r *Raindrops) gatherUrl(addr *url.URL, acc telegraf.Accumulator) error {
resp, err := r.http_client.Get(addr.String())
if err != nil {
return fmt.Errorf("error making HTTP request to %s: %s", addr.String(), err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("%s returned HTTP status %s", addr.String(), resp.Status)
}
buf := bufio.NewReader(resp.Body)

// Calling
_, err = buf.ReadString(':')
if err != nil {
return err
}
line, err := buf.ReadString('\n')
if err != nil {
return err
}
calling, err := strconv.ParseUint(strings.TrimSpace(line), 10, 64)
if err != nil {
return err
}

// Writing
_, err = buf.ReadString(':')
if err != nil {
return err
}
line, err = buf.ReadString('\n')
if err != nil {
return err
}
writing, err := strconv.ParseUint(strings.TrimSpace(line), 10, 64)
if err != nil {
return err
}
tags := r.getTags(addr)
fields := map[string]interface{}{
"calling": calling,
"writing": writing,
}
acc.AddFields("raindrops", fields, tags)

iterate := true
var queued_line_str string
var active_line_str string
var active_err error
var queued_err error

for iterate {
// Listen
var tags map[string]string

lis := map[string]interface{}{
"active": 0,
"queued": 0,
}
active_line_str, active_err = buf.ReadString('\n')
if active_err != nil {
iterate = false
break
}
if strings.Compare(active_line_str, "\n") == 0 {
break
}
queued_line_str, queued_err = buf.ReadString('\n')
if queued_err != nil {
iterate = false
}
active_line := strings.Split(active_line_str, " ")
listen_name := active_line[0]

active, err := strconv.ParseUint(strings.TrimSpace(active_line[2]), 10, 64)
if err != nil {
active = 0
}
lis["active"] = active

queued_line := strings.Split(queued_line_str, " ")
queued, err := strconv.ParseUint(strings.TrimSpace(queued_line[2]), 10, 64)
if err != nil {
queued = 0
}
lis["queued"] = queued
if strings.Contains(listen_name, ":") {
listener := strings.Split(listen_name, ":")
tags = map[string]string{
"ip": listener[0],
"port": listener[1],
}

} else {
tags = map[string]string{
"socket": listen_name,
}
}
acc.AddFields("raindrops_listen", lis, tags)
}
return nil
}

// Get tag(s) for the raindrops calling/writing plugin
func (r *Raindrops) getTags(addr *url.URL) map[string]string {
h := addr.Host
host, port, err := net.SplitHostPort(h)
if err != nil {
host = addr.Host
if addr.Scheme == "http" {
port = "80"
} else if addr.Scheme == "https" {
port = "443"
} else {
port = ""
}
}
return map[string]string{"server": host, "port": port}
}

func init() {
inputs.Add("raindrops", func() telegraf.Input {
return &Raindrops{http_client: &http.Client{Transport: &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}}}
})
}
107 changes: 107 additions & 0 deletions plugins/inputs/raindrops/raindrops_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package raindrops

import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"net/url"
"testing"

"github.com/influxdata/telegraf/testutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"time"
)

const sampleResponse = `
calling: 100
writing: 200
0.0.0.0:8080 active: 1
0.0.0.0:8080 queued: 2
0.0.0.0:8081 active: 3
0.0.0.0:8081 queued: 4
127.0.0.1:8082 active: 5
127.0.0.1:8082 queued: 6
0.0.0.0:8083 active: 7
0.0.0.0:8083 queued: 8
0.0.0.0:8084 active: 9
0.0.0.0:8084 queued: 10
0.0.0.0:3000 active: 11
0.0.0.0:3000 queued: 12
/tmp/listen.me active: 13
/tmp/listen.me queued: 14
`

// Verify that raindrops tags are properly parsed based on the server
func TestRaindropsTags(t *testing.T) {
urls := []string{"http://localhost/_raindrops", "http://localhost:80/_raindrops"}
var addr *url.URL
r := &Raindrops{}
for _, url1 := range urls {
addr, _ = url.Parse(url1)
tagMap := r.getTags(addr)
assert.Contains(t, tagMap["server"], "localhost")
}
}

func TestRaindropsGeneratesMetrics(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var rsp string

if r.URL.Path == "/_raindrops" {
rsp = sampleResponse
} else {
panic("Cannot handle request")
}

fmt.Fprintln(w, rsp)
}))
defer ts.Close()

n := &Raindrops{
Urls: []string{fmt.Sprintf("%s/_raindrops", ts.URL)},
http_client: &http.Client{Transport: &http.Transport{
ResponseHeaderTimeout: time.Duration(3 * time.Second),
}},
}

var acc testutil.Accumulator

err := n.Gather(&acc)
require.NoError(t, err)

fields := map[string]interface{}{
"calling": uint64(100),
"writing": uint64(200),
}
addr, err := url.Parse(ts.URL)
if err != nil {
panic(err)
}

host, port, err := net.SplitHostPort(addr.Host)
if err != nil {
host = addr.Host
if addr.Scheme == "http" {
port = "80"
} else if addr.Scheme == "https" {
port = "443"
} else {
port = ""
}
}

tags := map[string]string{"server": host, "port": port}
acc.AssertContainsTaggedFields(t, "raindrops", fields, tags)

tags = map[string]string{
"port": "8081",
"ip": "0.0.0.0",
}
fields = map[string]interface{}{
"active": uint64(3),
"queued": uint64(4),
}
acc.AssertContainsTaggedFields(t, "raindrops_listen", fields, tags)
}