Skip to content

Commit

Permalink
Add Wavefront service input plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Connor Gorman committed Jun 21, 2017
1 parent 2305830 commit e723f5d
Show file tree
Hide file tree
Showing 3 changed files with 390 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/twemproxy"
_ "github.com/influxdata/telegraf/plugins/inputs/udp_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/varnish"
_ "github.com/influxdata/telegraf/plugins/inputs/wavefront"
_ "github.com/influxdata/telegraf/plugins/inputs/webhooks"
_ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters"
_ "github.com/influxdata/telegraf/plugins/inputs/zfs"
Expand Down
250 changes: 250 additions & 0 deletions plugins/inputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
package wavefront

import (
"bufio"
"log"
"net"
"regexp"
"strconv"
"strings"
"time"
"unicode"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)

const (
defaultAllowPending = 10000
)

type metric struct {
name string
fields map[string]interface{}
tags map[string]string
times []time.Time
}

type Wavefront struct {
Address string `toml:"address"`

AllowedPendingMetrics int `toml:"allowed_pending"`

// tracks the number of dropped metrics.
drops int

// Channel for all incoming wavefront packets
in chan string
out chan metric

done chan bool

serverActive chan bool // For testing purposes to determine if it's listening
}

var sampleConfig = `
## Address
address = 10.169.255.100:2878
`

// SampleConfig returns a sample configuration block
func (w *Wavefront) SampleConfig() string {
return sampleConfig
}

// Description just returns a short description of the Mesos plugin
func (w *Wavefront) Description() string {
return "Telegraf input plugin for gathering metrics from sources using wavefront format"
}

func (w *Wavefront) SetDefaults() {
if w.Address == "" {
log.Println("I! [wavefront] Missing address value, setting default value (10.169.255.100)")
w.Address = "10.169.255.100"
}
}

func (w *Wavefront) parser() error {
for {
select {
case metricLine := <-w.in:
// Convert all multiple spaces and tabs to
r, _ := regexp.Compile("[ |\t]+")
metricLine = strings.TrimSpace(string(r.ReplaceAll([]byte(metricLine), []byte(" "))))
split := splitOnSpacesNotInQuotes(metricLine)
// Scrub invalid values
if split[1] == "nan" || split[1] == "Infinity" || split[1] == "null" || split[1] == "NaN" {
continue
}

var metricTimes []time.Time
tagIdx := 3 // Assumes there is a timestamp on the metric
unixSeconds, err := strconv.ParseInt(split[2], 10, 64)
// If it cannot be parsed then it is assumed that there is no timestamp and it is a tag instead
if err != nil {
tagIdx = 2
} else {
metricTimes = append(metricTimes, time.Unix(unixSeconds, int64(0)))
}

tags := make(map[string]string)
for _, tagStr := range split[tagIdx:] {
tagStr = strings.Replace(tagStr, "\"", "", -1)
tagIdx := strings.Index(tagStr, "=")
if tagIdx == -1 {
log.Printf("I! Malformed tag on metric: %v\n", tagStr)
continue
}
tags[tagStr[:tagIdx]] = tagStr[tagIdx+1:]
}

splitName := strings.Split(split[0], ".")
if len(splitName) < 2 {
log.Printf("I! Metric name is not namespaced. Skipping... %v\n", split[0])
continue
}

value, isNumeric := convertToNumeric(split[1])
// This would imply an invalid wavefront metric because they do not handle anything but strings
if !isNumeric {
continue
}

w.out <- metric{
name: splitName[0],
fields: map[string]interface{}{
strings.Join(splitName[1:], "."): value,
},
tags: tags,
times: metricTimes,
}
}
}
}

// Gather() metrics
func (w *Wavefront) Gather(acc telegraf.Accumulator) error {
LOOP:
for {
select {
case m := <-w.out:
acc.AddFields(m.name, m.fields, m.tags, m.times...)
default:
break LOOP
}
}
return nil
}

func (w *Wavefront) Start(_ telegraf.Accumulator) error {
log.Printf("I! Started the wavefront service on %s\n", w.Address)
// Start the UDP listener
go w.listen()
// Start the line parser
go w.parser()
return nil
}

func (w *Wavefront) Stop() {
w.done <- true
}

func init() {
inputs.Add("wavefront", func() telegraf.Input {
return &Wavefront{
AllowedPendingMetrics: defaultAllowPending,
in: make(chan string, defaultAllowPending),
out: make(chan metric, defaultAllowPending),
done: make(chan bool, 10),
serverActive: make(chan bool, 1),
Address: "10.169.100.100",
}
})
}

func (w *Wavefront) handleClient(conn net.Conn) {
defer conn.Close()
reader := bufio.NewReader(conn)
for {
wfMetric, err := reader.ReadString('\n')
if err != nil {
return
}

select {
case w.in <- strings.TrimSpace(wfMetric):
default:
w.drops++
if w.drops != 0 {
log.Printf("I! has dropped this many metrics: %v\n", w.drops)
}
}
}
}

func (w *Wavefront) listen() {
tcpAddr, err := net.ResolveTCPAddr("tcp4", w.Address)
if err != nil {
panic(err)
}
l, err := net.ListenTCP("tcp", tcpAddr)
if err != nil {
panic(err)
}

// Close the listener when the application closes.
defer l.Close()
log.Printf("I! Listening on %v\n", w.Address)
w.serverActive <- true
acceptChan := make(chan bool, 1)
acceptChan <- true
LISTENER:
for {
select {
case <-acceptChan:
// Listen for an incoming connection.
conn, err := l.Accept()
if err != nil {
log.Printf("E! Error accepting new metrics: %s\n", err.Error())
}
go w.handleClient(conn)
acceptChan <- true
case <-w.done:
log.Printf("I! Stopping listener\n")
break LISTENER
}
}
}

// Converts string values taken from aurora vars to numeric values for wavefront
func convertToNumeric(value string) (interface{}, bool) {
var err error
var val interface{}
if val, err = strconv.ParseFloat(value, 64); err == nil {
return val, true
}
if val, err = strconv.ParseBool(value); err != nil {
return val.(bool), false
}
return val, true
}

func splitOnSpacesNotInQuotes(wfString string) []string {
lastQuote := rune(0)
f := func(c rune) bool {
switch {
case c == lastQuote:
lastQuote = rune(0)
return false
case lastQuote != rune(0):
return false
case unicode.In(c, unicode.Quotation_Mark):
lastQuote = c
return false
default:
return unicode.IsSpace(c)

}
}
return strings.FieldsFunc(wfString, f)
}
139 changes: 139 additions & 0 deletions plugins/inputs/wavefront/wavefront_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package wavefront

import (
"log"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/influxdata/telegraf/testutil"
)

const testServer = "localhost:1099"

var metrics = `docker.n.images 30 1496156870 engine_host="fib-r10-u05" source="fib-r10-u05"
test1.n.used.file.descriptors 26 1496156870 engine_host="fib-r10-u10" source="fib-r10-u10"
`

var individualMetrics = []string{
`docker.n.images 30 1496156870 engine_host="fib-r10-u05" source="fib-r10-u05"`,
`test1.n.used.file.descriptors 26 1496156870 engine_host="fib-r10-u10" source="fib-r10-u10"`,
}

var dockerFields = map[string]interface{}{
"n.images": "30",
}

var dockerTags = map[string]string{
"engine_host": "fib-r10-u05",
"source": "fib-r10-u05",
}

var test1Fields = map[string]interface{}{
"n.used.file.descriptors": "26",
}

var test1Tags = map[string]string{
"engine_host": "fib-r10-u10",
"source": "fib-r10-u10",
}

var metricStructs = []metric{
metric{
name: "docker",
fields: dockerFields,
tags: dockerTags,
times: []time.Time{
time.Unix(int64(1496156870), int64(0)),
},
},
metric{
name: "test1",
fields: test1Fields,
tags: test1Tags,
times: []time.Time{
time.Unix(int64(1496156870), int64(0)),
},
},
}

func sendMetric(metric string) {
tcpAddr, err := net.ResolveTCPAddr("tcp", testServer)
if err != nil {
log.Fatalf("ResolveTCPAddr failed: %v", err)
}

conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Fatalf("Could not dial wavefront input: %v", err)
}

if _, err = conn.Write([]byte(metric)); err != nil {
log.Fatalf("Could not write metric to server: %v", err)
}
}

func TestListen(t *testing.T) {
w := &Wavefront{
in: make(chan string, 2),
out: make(chan metric, 2),
done: make(chan bool, 2),
serverActive: make(chan bool, 1),
Address: testServer,
}
// Launch server
go w.listen()

// Wait for the server to go active
<-w.serverActive

// Dial the wavefront server and send the metrics
sendMetric(metrics)
// Make sure all metrics are as expected
for _, metric := range individualMetrics {
assert.Equal(t, metric, <-w.in)
}
// Gracefully shutdown server for next test
w.done <- true
}

func TestParser(t *testing.T) {
w := &Wavefront{
in: make(chan string, 2),
out: make(chan metric, 2),
done: make(chan bool, 2),
serverActive: make(chan bool, 1),
Address: testServer,
}
for _, metric := range individualMetrics {
w.in <- metric
}
go w.parser()

for _, metric := range metricStructs {
assert.Equal(t, metric, <-w.out)
}
}

func TestGather(t *testing.T) {
w := &Wavefront{
in: make(chan string, 2),
out: make(chan metric, 2),
done: make(chan bool, 2),
serverActive: make(chan bool, 1),
Address: testServer,
}

var acc testutil.Accumulator
go w.Gather(&acc)
for _, metric := range metricStructs {
w.out <- metric
}

acc.Wait(2)
acc.AssertContainsTaggedFields(t, "docker", dockerFields, dockerTags)
acc.AssertContainsTaggedFields(t, "test1", test1Fields, test1Tags)

}

0 comments on commit e723f5d

Please sign in to comment.