Skip to content

Commit

Permalink
Add wavefront plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
Connor Gorman committed May 9, 2017
1 parent 5af985e commit cc1b593
Show file tree
Hide file tree
Showing 4 changed files with 591 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/riemann"
_ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy"
_ "github.com/influxdata/telegraf/plugins/outputs/socket_writer"
_ "github.com/influxdata/telegraf/plugins/outputs/wavefront"
)
117 changes: 117 additions & 0 deletions plugins/outputs/wavefront/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# Wavefront Output Plugin

This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefront data format over TCP.


## Wavefront Data format

The expected input for Wavefront is specified in the following way:

```
<metric> <value> [<timestamp>] <source|host>=<soureTagValue> [tagk1=tagv1 ...tagkN=tagvN]
```

More information about the Wavefront data format is available [here](https://community.wavefront.com/docs/DOC-1031)


By default, to ease Metrics browsing in the Wavefront UI, metrics are grouped by converting any `_` characters to `.` in the final name.
This behavior can be altered by changing the `metric_separator` and/or the `convert_paths` settings.
Most illegal characters in the metric name are automatically converted to `-`.
The `use_regex` setting can be used to ensure all illegal characters are properly handled, but can lead to performance degradation.

## Configuration:

```toml
# Configuration for Wavefront output
[[outputs.wavefront]]
## prefix for metrics keys
prefix = "my.specific.prefix."

## DNS name of the wavefront proxy server
host = "wavefront.example.com"

## Port that the Wavefront proxy server listens on
port = 2878

## wether to use "value" for name of simple fields
simple_fields = false

## character to use between metric and field name. defaults to . (dot)
metric_separator = "."

## Convert metric name paths to use metricSeperator character
## When true (default) will convert all _ (underscore) chartacters in final metric name
convert_paths = true

## Use Regex to sanitize metric and tag names from invalid characters
## Regex is more thorough, but significantly slower
use_regex = false

## point tags to use as the source name for Wavefront (if none found, host will be used)
source_override = ["hostname", "snmp_host", "node_host"]

## Print additional debug information requires debug = true at the agent level
debug_all = false
```

Parameters:

Prefix string
Host string
Port int
SimpleFields bool
MetricSeparator string
ConvertPaths bool
UseRegex bool
SourceOverride string
DebugAll bool

* `prefix`: String to use as a prefix for all sent metrics.
* `host`: Name of Wavefront proxy server
* `port`: Port that Wavefront proxy server is configured for `pushListenerPorts`
* `simple_fields`: if false (default) metric field names called `value` are converted to empty strings
* `metric_separator`: character to use to separate metric and field names. (default is `_`)
* `convert_paths`: if true (default) will convert all `_` in metric and field names to `metric_seperator`
* `use_regex`: if true (default is false) will use regex to ensure all illegal characters are converted to `-`. Regex is much slower than the default mode which will catch most illegal characters. Use with caution.
* `source_override`: ordered list of point tags to use as the source name for Wavefront. Once a match is found, that tag is used as the source for that point. If no tags are found the host tag will be used.
* `debug_all`: Will output additional debug information. Requires `debug = true` to be configured at the agent level


##

The Wavefront proxy interface can be simulated with this reader:

```
// wavefront_proxy_mock.go
package main
import (
"io"
"log"
"net"
"os"
)
func main() {
l, err := net.Listen("tcp", "localhost:2878")
if err != nil {
log.Fatal(err)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Fatal(err)
}
go func(c net.Conn) {
defer c.Close()
io.Copy(os.Stdout, c)
}(conn)
}
}
```

## Allowed values for metrics

Wavefront allows `integers` and `floats` as input values
262 changes: 262 additions & 0 deletions plugins/outputs/wavefront/wavefront.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
package wavefront

import (
"fmt"
"net"
"regexp"
"sort"
"strconv"
"strings"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"log"
)

type Wavefront struct {
Prefix string
Host string
Port int
SimpleFields bool
MetricSeparator string
ConvertPaths bool
UseRegex bool
SourceOverride []string
DebugAll bool
}

// catch many of the invalid chars that could appear in a metric or tag name
var sanitizedChars = strings.NewReplacer(
"!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-",
"*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-",
"[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-",
">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-",
)

// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer
var sanitizedRegex, _ = regexp.Compile("[^a-zA-Z\\d_.-]")

var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-")

var pathReplacer = strings.NewReplacer("_", "_")

var sampleConfig = `
## prefix for metrics keys
#prefix = "my.specific.prefix."
## DNS name of the wavefront proxy server
host = "wavefront.example.com"
## Port that the Wavefront proxy server listens on
port = 2878
## wether to use "value" for name of simple fields
#simple_fields = false
## character to use between metric and field name. defaults to . (dot)
#metric_separator = "."
## Convert metric name paths to use metricSeperator character
## When true (default) will convert all _ (underscore) chartacters in final metric name
#convert_paths = true
## Use Regex to sanitize metric and tag names from invalid characters
## Regex is more thorough, but significantly slower
#use_regex = false
## point tags to use as the source name for Wavefront (if none found, host will be used)
#source_override = ["hostname", "snmp_host", "node_host"]
## Print additional debug information requires debug = true at the agent level
#debug_all = false
`

type MetricLine struct {
Metric string
Value string
Timestamp int64
Tags string
}

func (w *Wavefront) Connect() error {

if w.ConvertPaths && w.MetricSeparator == "_" {
w.ConvertPaths = false
}
if w.ConvertPaths {
pathReplacer = strings.NewReplacer("_", w.MetricSeparator)
}

// Test Connection to Wavefront proxy Server
uri := fmt.Sprintf("%s:%d", w.Host, w.Port)
tcpAddr, err := net.ResolveTCPAddr("tcp", uri)
if err != nil {
return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error())
}
connection, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
}
defer connection.Close()
return nil
}

func (w *Wavefront) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}

// Send Data to Wavefront proxy Server
uri := fmt.Sprintf("%s:%d", w.Host, w.Port)
tcpAddr, _ := net.ResolveTCPAddr("tcp", uri)
connection, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error())
}
defer connection.Close()

for _, m := range metrics {
for _, metric := range buildMetrics(m, w) {
messageLine := fmt.Sprintf("%s %s %v %s\n", metric.Metric, metric.Value, metric.Timestamp, metric.Tags)
log.Printf("D! Output [wavefront] %s", messageLine)
_, err := connection.Write([]byte(messageLine))
if err != nil {
return fmt.Errorf("Wavefront: TCP writing error %s", err.Error())
}
}
}

return nil
}

func buildTags(mTags map[string]string, w *Wavefront) []string {
sourceTagFound := false

for _, s := range w.SourceOverride {
for k, v := range mTags {
if k == s {
mTags["source"] = v
mTags["telegraf_host"] = mTags["host"]
sourceTagFound = true
delete(mTags, k)
break
}
}
if sourceTagFound {
break
}
}

if !sourceTagFound {
mTags["source"] = mTags["host"]
}
delete(mTags, "host")

tags := make([]string, len(mTags))
index := 0
for k, v := range mTags {
if w.UseRegex {
tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedRegex.ReplaceAllString(k, "-"), tagValueReplacer.Replace(v))
} else {
tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedChars.Replace(k), tagValueReplacer.Replace(v))
}

index++
}

sort.Strings(tags)
return tags
}

func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricLine {
if w.DebugAll {
log.Printf("D! Output [wavefront] original name: %s\n", m.Name())
}

ret := []*MetricLine{}
for fieldName, value := range m.Fields() {
if w.DebugAll {
log.Printf("D! Output [wavefront] original field: %s\n", fieldName)
}

var name string
if !w.SimpleFields && fieldName == "value" {
name = fmt.Sprintf("%s%s", w.Prefix, m.Name())
} else {
name = fmt.Sprintf("%s%s%s%s", w.Prefix, m.Name(), w.MetricSeparator, fieldName)
}

if w.UseRegex {
name = sanitizedRegex.ReplaceAllLiteralString(name, "-")
} else {
name = sanitizedChars.Replace(name)
}

if w.ConvertPaths {
name = pathReplacer.Replace(name)
}

metric := &MetricLine{
Metric: name,
Timestamp: m.UnixNano() / 1000000000,
}
metricValue, buildError := buildValue(value, metric.Metric)
if buildError != nil {
log.Printf("E! Output [wavefront] %s\n", buildError.Error())
continue
}
metric.Value = metricValue
tagsSlice := buildTags(m.Tags(), w)
metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " "))
ret = append(ret, metric)
}
return ret
}

func buildValue(v interface{}, name string) (string, error) {
var retv string
switch p := v.(type) {
case int64:
retv = IntToString(int64(p))
case uint64:
retv = UIntToString(uint64(p))
case float64:
retv = FloatToString(float64(p))
default:
return retv, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name)
}
return retv, nil
}

func IntToString(input_num int64) string {
return strconv.FormatInt(input_num, 10)
}

func UIntToString(input_num uint64) string {
return strconv.FormatUint(input_num, 10)
}

func FloatToString(input_num float64) string {
return strconv.FormatFloat(input_num, 'f', 6, 64)
}

func (w *Wavefront) SampleConfig() string {
return sampleConfig
}

func (w *Wavefront) Description() string {
return "Configuration for Wavefront server to send metrics to"
}

func (w *Wavefront) Close() error {
return nil
}

func init() {
outputs.Add("wavefront", func() telegraf.Output {
return &Wavefront{
MetricSeparator: ".",
ConvertPaths: true,
}
})
}
Loading

0 comments on commit cc1b593

Please sign in to comment.