diff --git a/plugins/inputs/all/all.go b/plugins/inputs/all/all.go index add2181a520dc..d105b65a3050a 100644 --- a/plugins/inputs/all/all.go +++ b/plugins/inputs/all/all.go @@ -82,6 +82,7 @@ import ( _ "github.com/influxdata/telegraf/plugins/inputs/twemproxy" _ "github.com/influxdata/telegraf/plugins/inputs/udp_listener" _ "github.com/influxdata/telegraf/plugins/inputs/varnish" + _ "github.com/influxdata/telegraf/plugins/inputs/wavefront" _ "github.com/influxdata/telegraf/plugins/inputs/webhooks" _ "github.com/influxdata/telegraf/plugins/inputs/win_perf_counters" _ "github.com/influxdata/telegraf/plugins/inputs/zfs" diff --git a/plugins/inputs/wavefront/wavefront.go b/plugins/inputs/wavefront/wavefront.go new file mode 100644 index 0000000000000..465558fe11e5c --- /dev/null +++ b/plugins/inputs/wavefront/wavefront.go @@ -0,0 +1,250 @@ +package wavefront + +import ( + "bufio" + "log" + "net" + "regexp" + "strconv" + "strings" + "time" + "unicode" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/inputs" +) + +const ( + defaultAllowPending = 10000 +) + +type metric struct { + name string + fields map[string]interface{} + tags map[string]string + times []time.Time +} + +type Wavefront struct { + Address string `toml:"address"` + + AllowedPendingMetrics int `toml:"allowed_pending"` + + // tracks the number of dropped metrics. + drops int + + // Channel for all incoming wavefront packets + in chan string + out chan metric + + done chan bool + + serverActive chan bool // For testing purposes to determine if it's listening +} + +var sampleConfig = ` + ## Address + address = 10.169.255.100:2878 +` + +// SampleConfig returns a sample configuration block +func (w *Wavefront) SampleConfig() string { + return sampleConfig +} + +// Description just returns a short description of the Mesos plugin +func (w *Wavefront) Description() string { + return "Telegraf input plugin for gathering metrics from sources using wavefront format" +} + +func (w *Wavefront) SetDefaults() { + if w.Address == "" { + log.Println("I! [wavefront] Missing address value, setting default value (10.169.255.100)") + w.Address = "10.169.255.100" + } +} + +func (w *Wavefront) parser() error { + for { + select { + case metricLine := <-w.in: + // Convert all multiple spaces and tabs to + r, _ := regexp.Compile("[ |\t]+") + metricLine = strings.TrimSpace(string(r.ReplaceAll([]byte(metricLine), []byte(" ")))) + split := splitOnSpacesNotInQuotes(metricLine) + // Scrub invalid values + if split[1] == "nan" || split[1] == "Infinity" || split[1] == "null" || split[1] == "NaN" { + continue + } + + var metricTimes []time.Time + tagIdx := 3 // Assumes there is a timestamp on the metric + unixSeconds, err := strconv.ParseInt(split[2], 10, 64) + // If it cannot be parsed then it is assumed that there is no timestamp and it is a tag instead + if err != nil { + tagIdx = 2 + } else { + metricTimes = append(metricTimes, time.Unix(unixSeconds, int64(0))) + } + + tags := make(map[string]string) + for _, tagStr := range split[tagIdx:] { + tagStr = strings.Replace(tagStr, "\"", "", -1) + tagIdx := strings.Index(tagStr, "=") + if tagIdx == -1 { + log.Printf("I! Malformed tag on metric: %v\n", tagStr) + continue + } + tags[tagStr[:tagIdx]] = tagStr[tagIdx+1:] + } + + splitName := strings.Split(split[0], ".") + if len(splitName) < 2 { + log.Printf("I! Metric name is not namespaced. Skipping... %v\n", split[0]) + continue + } + + value, isNumeric := convertToNumeric(split[1]) + // This would imply an invalid wavefront metric because they do not handle anything but strings + if !isNumeric { + continue + } + + w.out <- metric{ + name: splitName[0], + fields: map[string]interface{}{ + strings.Join(splitName[1:], "."): value, + }, + tags: tags, + times: metricTimes, + } + } + } +} + +// Gather() metrics +func (w *Wavefront) Gather(acc telegraf.Accumulator) error { +LOOP: + for { + select { + case m := <-w.out: + acc.AddFields(m.name, m.fields, m.tags, m.times...) + default: + break LOOP + } + } + return nil +} + +func (w *Wavefront) Start(_ telegraf.Accumulator) error { + log.Printf("I! Started the wavefront service on %s\n", w.Address) + // Start the UDP listener + go w.listen() + // Start the line parser + go w.parser() + return nil +} + +func (w *Wavefront) Stop() { + w.done <- true +} + +func init() { + inputs.Add("wavefront", func() telegraf.Input { + return &Wavefront{ + AllowedPendingMetrics: defaultAllowPending, + in: make(chan string, defaultAllowPending), + out: make(chan metric, defaultAllowPending), + done: make(chan bool, 10), + serverActive: make(chan bool, 1), + Address: "10.169.100.100", + } + }) +} + +func (w *Wavefront) handleClient(conn net.Conn) { + defer conn.Close() + reader := bufio.NewReader(conn) + for { + wfMetric, err := reader.ReadString('\n') + if err != nil { + return + } + + select { + case w.in <- strings.TrimSpace(wfMetric): + default: + w.drops++ + if w.drops != 0 { + log.Printf("I! has dropped this many metrics: %v\n", w.drops) + } + } + } +} + +func (w *Wavefront) listen() { + tcpAddr, err := net.ResolveTCPAddr("tcp4", w.Address) + if err != nil { + panic(err) + } + l, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + panic(err) + } + + // Close the listener when the application closes. + defer l.Close() + log.Printf("I! Listening on %v\n", w.Address) + w.serverActive <- true + acceptChan := make(chan bool, 1) + acceptChan <- true +LISTENER: + for { + select { + case <-acceptChan: + // Listen for an incoming connection. + conn, err := l.Accept() + if err != nil { + log.Printf("E! Error accepting new metrics: %s\n", err.Error()) + } + go w.handleClient(conn) + acceptChan <- true + case <-w.done: + log.Printf("I! Stopping listener\n") + break LISTENER + } + } +} + +// Converts string values taken from aurora vars to numeric values for wavefront +func convertToNumeric(value string) (interface{}, bool) { + var err error + var val interface{} + if val, err = strconv.ParseFloat(value, 64); err == nil { + return val, true + } + if val, err = strconv.ParseBool(value); err != nil { + return val.(bool), false + } + return val, true +} + +func splitOnSpacesNotInQuotes(wfString string) []string { + lastQuote := rune(0) + f := func(c rune) bool { + switch { + case c == lastQuote: + lastQuote = rune(0) + return false + case lastQuote != rune(0): + return false + case unicode.In(c, unicode.Quotation_Mark): + lastQuote = c + return false + default: + return unicode.IsSpace(c) + + } + } + return strings.FieldsFunc(wfString, f) +} diff --git a/plugins/inputs/wavefront/wavefront_test.go b/plugins/inputs/wavefront/wavefront_test.go new file mode 100644 index 0000000000000..1b616cc40b0a9 --- /dev/null +++ b/plugins/inputs/wavefront/wavefront_test.go @@ -0,0 +1,139 @@ +package wavefront + +import ( + "log" + "net" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/influxdata/telegraf/testutil" +) + +const testServer = "localhost:1099" + +var metrics = `docker.n.images 30 1496156870 engine_host="fib-r10-u05" source="fib-r10-u05" +test1.n.used.file.descriptors 26 1496156870 engine_host="fib-r10-u10" source="fib-r10-u10" +` + +var individualMetrics = []string{ + `docker.n.images 30 1496156870 engine_host="fib-r10-u05" source="fib-r10-u05"`, + `test1.n.used.file.descriptors 26 1496156870 engine_host="fib-r10-u10" source="fib-r10-u10"`, +} + +var dockerFields = map[string]interface{}{ + "n.images": "30", +} + +var dockerTags = map[string]string{ + "engine_host": "fib-r10-u05", + "source": "fib-r10-u05", +} + +var test1Fields = map[string]interface{}{ + "n.used.file.descriptors": "26", +} + +var test1Tags = map[string]string{ + "engine_host": "fib-r10-u10", + "source": "fib-r10-u10", +} + +var metricStructs = []metric{ + metric{ + name: "docker", + fields: dockerFields, + tags: dockerTags, + times: []time.Time{ + time.Unix(int64(1496156870), int64(0)), + }, + }, + metric{ + name: "test1", + fields: test1Fields, + tags: test1Tags, + times: []time.Time{ + time.Unix(int64(1496156870), int64(0)), + }, + }, +} + +func sendMetric(metric string) { + tcpAddr, err := net.ResolveTCPAddr("tcp", testServer) + if err != nil { + log.Fatalf("ResolveTCPAddr failed: %v", err) + } + + conn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + log.Fatalf("Could not dial wavefront input: %v", err) + } + + if _, err = conn.Write([]byte(metric)); err != nil { + log.Fatalf("Could not write metric to server: %v", err) + } +} + +func TestListen(t *testing.T) { + w := &Wavefront{ + in: make(chan string, 2), + out: make(chan metric, 2), + done: make(chan bool, 2), + serverActive: make(chan bool, 1), + Address: testServer, + } + // Launch server + go w.listen() + + // Wait for the server to go active + <-w.serverActive + + // Dial the wavefront server and send the metrics + sendMetric(metrics) + // Make sure all metrics are as expected + for _, metric := range individualMetrics { + assert.Equal(t, metric, <-w.in) + } + // Gracefully shutdown server for next test + w.done <- true +} + +func TestParser(t *testing.T) { + w := &Wavefront{ + in: make(chan string, 2), + out: make(chan metric, 2), + done: make(chan bool, 2), + serverActive: make(chan bool, 1), + Address: testServer, + } + for _, metric := range individualMetrics { + w.in <- metric + } + go w.parser() + + for _, metric := range metricStructs { + assert.Equal(t, metric, <-w.out) + } +} + +func TestGather(t *testing.T) { + w := &Wavefront{ + in: make(chan string, 2), + out: make(chan metric, 2), + done: make(chan bool, 2), + serverActive: make(chan bool, 1), + Address: testServer, + } + + var acc testutil.Accumulator + go w.Gather(&acc) + for _, metric := range metricStructs { + w.out <- metric + } + + acc.Wait(2) + acc.AssertContainsTaggedFields(t, "docker", dockerFields, dockerTags) + acc.AssertContainsTaggedFields(t, "test1", test1Fields, test1Tags) + +}