From cc1b593f40608a0daf7c08c3e47e2c6397fdb2c8 Mon Sep 17 00:00:00 2001 From: Connor Gorman Date: Tue, 9 May 2017 09:14:42 -0700 Subject: [PATCH] Add wavefront plugin --- plugins/outputs/all/all.go | 1 + plugins/outputs/wavefront/README.md | 117 +++++++++ plugins/outputs/wavefront/wavefront.go | 262 ++++++++++++++++++++ plugins/outputs/wavefront/wavefront_test.go | 211 ++++++++++++++++ 4 files changed, 591 insertions(+) create mode 100644 plugins/outputs/wavefront/README.md create mode 100644 plugins/outputs/wavefront/wavefront.go create mode 100644 plugins/outputs/wavefront/wavefront_test.go diff --git a/plugins/outputs/all/all.go b/plugins/outputs/all/all.go index 089a5690977e5..53d1cb8cab8cb 100644 --- a/plugins/outputs/all/all.go +++ b/plugins/outputs/all/all.go @@ -23,4 +23,5 @@ import ( _ "github.com/influxdata/telegraf/plugins/outputs/riemann" _ "github.com/influxdata/telegraf/plugins/outputs/riemann_legacy" _ "github.com/influxdata/telegraf/plugins/outputs/socket_writer" + _ "github.com/influxdata/telegraf/plugins/outputs/wavefront" ) diff --git a/plugins/outputs/wavefront/README.md b/plugins/outputs/wavefront/README.md new file mode 100644 index 0000000000000..27e4566cd1c1d --- /dev/null +++ b/plugins/outputs/wavefront/README.md @@ -0,0 +1,117 @@ +# Wavefront Output Plugin + +This plugin writes to a [Wavefront](https://www.wavefront.com) proxy, in Wavefront data format over TCP. + + +## Wavefront Data format + +The expected input for Wavefront is specified in the following way: + +``` + [] = [tagk1=tagv1 ...tagkN=tagvN] +``` + +More information about the Wavefront data format is available [here](https://community.wavefront.com/docs/DOC-1031) + + +By default, to ease Metrics browsing in the Wavefront UI, metrics are grouped by converting any `_` characters to `.` in the final name. +This behavior can be altered by changing the `metric_separator` and/or the `convert_paths` settings. +Most illegal characters in the metric name are automatically converted to `-`. +The `use_regex` setting can be used to ensure all illegal characters are properly handled, but can lead to performance degradation. + +## Configuration: + +```toml +# Configuration for Wavefront output +[[outputs.wavefront]] + ## prefix for metrics keys + prefix = "my.specific.prefix." + + ## DNS name of the wavefront proxy server + host = "wavefront.example.com" + + ## Port that the Wavefront proxy server listens on + port = 2878 + + ## wether to use "value" for name of simple fields + simple_fields = false + + ## character to use between metric and field name. defaults to . (dot) + metric_separator = "." + + ## Convert metric name paths to use metricSeperator character + ## When true (default) will convert all _ (underscore) chartacters in final metric name + convert_paths = true + + ## Use Regex to sanitize metric and tag names from invalid characters + ## Regex is more thorough, but significantly slower + use_regex = false + + ## point tags to use as the source name for Wavefront (if none found, host will be used) + source_override = ["hostname", "snmp_host", "node_host"] + + ## Print additional debug information requires debug = true at the agent level + debug_all = false +``` + +Parameters: + + Prefix string + Host string + Port int + SimpleFields bool + MetricSeparator string + ConvertPaths bool + UseRegex bool + SourceOverride string + DebugAll bool + +* `prefix`: String to use as a prefix for all sent metrics. +* `host`: Name of Wavefront proxy server +* `port`: Port that Wavefront proxy server is configured for `pushListenerPorts` +* `simple_fields`: if false (default) metric field names called `value` are converted to empty strings +* `metric_separator`: character to use to separate metric and field names. (default is `_`) +* `convert_paths`: if true (default) will convert all `_` in metric and field names to `metric_seperator` +* `use_regex`: if true (default is false) will use regex to ensure all illegal characters are converted to `-`. Regex is much slower than the default mode which will catch most illegal characters. Use with caution. +* `source_override`: ordered list of point tags to use as the source name for Wavefront. Once a match is found, that tag is used as the source for that point. If no tags are found the host tag will be used. +* `debug_all`: Will output additional debug information. Requires `debug = true` to be configured at the agent level + + +## + +The Wavefront proxy interface can be simulated with this reader: + +``` +// wavefront_proxy_mock.go +package main + +import ( + "io" + "log" + "net" + "os" +) + +func main() { + l, err := net.Listen("tcp", "localhost:2878") + if err != nil { + log.Fatal(err) + } + defer l.Close() + for { + conn, err := l.Accept() + if err != nil { + log.Fatal(err) + } + go func(c net.Conn) { + defer c.Close() + io.Copy(os.Stdout, c) + }(conn) + } +} + +``` + +## Allowed values for metrics + +Wavefront allows `integers` and `floats` as input values diff --git a/plugins/outputs/wavefront/wavefront.go b/plugins/outputs/wavefront/wavefront.go new file mode 100644 index 0000000000000..e664ee6b192c3 --- /dev/null +++ b/plugins/outputs/wavefront/wavefront.go @@ -0,0 +1,262 @@ +package wavefront + +import ( + "fmt" + "net" + "regexp" + "sort" + "strconv" + "strings" + + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/plugins/outputs" + "log" +) + +type Wavefront struct { + Prefix string + Host string + Port int + SimpleFields bool + MetricSeparator string + ConvertPaths bool + UseRegex bool + SourceOverride []string + DebugAll bool +} + +// catch many of the invalid chars that could appear in a metric or tag name +var sanitizedChars = strings.NewReplacer( + "!", "-", "@", "-", "#", "-", "$", "-", "%", "-", "^", "-", "&", "-", + "*", "-", "(", "-", ")", "-", "+", "-", "`", "-", "'", "-", "\"", "-", + "[", "-", "]", "-", "{", "-", "}", "-", ":", "-", ";", "-", "<", "-", + ">", "-", ",", "-", "?", "-", "/", "-", "\\", "-", "|", "-", " ", "-", +) + +// instead of Replacer which may miss some special characters we can use a regex pattern, but this is significantly slower than Replacer +var sanitizedRegex, _ = regexp.Compile("[^a-zA-Z\\d_.-]") + +var tagValueReplacer = strings.NewReplacer("\"", "\\\"", "*", "-") + +var pathReplacer = strings.NewReplacer("_", "_") + +var sampleConfig = ` + ## prefix for metrics keys + #prefix = "my.specific.prefix." + + ## DNS name of the wavefront proxy server + host = "wavefront.example.com" + + ## Port that the Wavefront proxy server listens on + port = 2878 + + ## wether to use "value" for name of simple fields + #simple_fields = false + + ## character to use between metric and field name. defaults to . (dot) + #metric_separator = "." + + ## Convert metric name paths to use metricSeperator character + ## When true (default) will convert all _ (underscore) chartacters in final metric name + #convert_paths = true + + ## Use Regex to sanitize metric and tag names from invalid characters + ## Regex is more thorough, but significantly slower + #use_regex = false + + ## point tags to use as the source name for Wavefront (if none found, host will be used) + #source_override = ["hostname", "snmp_host", "node_host"] + + ## Print additional debug information requires debug = true at the agent level + #debug_all = false +` + +type MetricLine struct { + Metric string + Value string + Timestamp int64 + Tags string +} + +func (w *Wavefront) Connect() error { + + if w.ConvertPaths && w.MetricSeparator == "_" { + w.ConvertPaths = false + } + if w.ConvertPaths { + pathReplacer = strings.NewReplacer("_", w.MetricSeparator) + } + + // Test Connection to Wavefront proxy Server + uri := fmt.Sprintf("%s:%d", w.Host, w.Port) + tcpAddr, err := net.ResolveTCPAddr("tcp", uri) + if err != nil { + return fmt.Errorf("Wavefront: TCP address cannot be resolved %s", err.Error()) + } + connection, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) + } + defer connection.Close() + return nil +} + +func (w *Wavefront) Write(metrics []telegraf.Metric) error { + if len(metrics) == 0 { + return nil + } + + // Send Data to Wavefront proxy Server + uri := fmt.Sprintf("%s:%d", w.Host, w.Port) + tcpAddr, _ := net.ResolveTCPAddr("tcp", uri) + connection, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + return fmt.Errorf("Wavefront: TCP connect fail %s", err.Error()) + } + defer connection.Close() + + for _, m := range metrics { + for _, metric := range buildMetrics(m, w) { + messageLine := fmt.Sprintf("%s %s %v %s\n", metric.Metric, metric.Value, metric.Timestamp, metric.Tags) + log.Printf("D! Output [wavefront] %s", messageLine) + _, err := connection.Write([]byte(messageLine)) + if err != nil { + return fmt.Errorf("Wavefront: TCP writing error %s", err.Error()) + } + } + } + + return nil +} + +func buildTags(mTags map[string]string, w *Wavefront) []string { + sourceTagFound := false + + for _, s := range w.SourceOverride { + for k, v := range mTags { + if k == s { + mTags["source"] = v + mTags["telegraf_host"] = mTags["host"] + sourceTagFound = true + delete(mTags, k) + break + } + } + if sourceTagFound { + break + } + } + + if !sourceTagFound { + mTags["source"] = mTags["host"] + } + delete(mTags, "host") + + tags := make([]string, len(mTags)) + index := 0 + for k, v := range mTags { + if w.UseRegex { + tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedRegex.ReplaceAllString(k, "-"), tagValueReplacer.Replace(v)) + } else { + tags[index] = fmt.Sprintf("%s=\"%s\"", sanitizedChars.Replace(k), tagValueReplacer.Replace(v)) + } + + index++ + } + + sort.Strings(tags) + return tags +} + +func buildMetrics(m telegraf.Metric, w *Wavefront) []*MetricLine { + if w.DebugAll { + log.Printf("D! Output [wavefront] original name: %s\n", m.Name()) + } + + ret := []*MetricLine{} + for fieldName, value := range m.Fields() { + if w.DebugAll { + log.Printf("D! Output [wavefront] original field: %s\n", fieldName) + } + + var name string + if !w.SimpleFields && fieldName == "value" { + name = fmt.Sprintf("%s%s", w.Prefix, m.Name()) + } else { + name = fmt.Sprintf("%s%s%s%s", w.Prefix, m.Name(), w.MetricSeparator, fieldName) + } + + if w.UseRegex { + name = sanitizedRegex.ReplaceAllLiteralString(name, "-") + } else { + name = sanitizedChars.Replace(name) + } + + if w.ConvertPaths { + name = pathReplacer.Replace(name) + } + + metric := &MetricLine{ + Metric: name, + Timestamp: m.UnixNano() / 1000000000, + } + metricValue, buildError := buildValue(value, metric.Metric) + if buildError != nil { + log.Printf("E! Output [wavefront] %s\n", buildError.Error()) + continue + } + metric.Value = metricValue + tagsSlice := buildTags(m.Tags(), w) + metric.Tags = fmt.Sprint(strings.Join(tagsSlice, " ")) + ret = append(ret, metric) + } + return ret +} + +func buildValue(v interface{}, name string) (string, error) { + var retv string + switch p := v.(type) { + case int64: + retv = IntToString(int64(p)) + case uint64: + retv = UIntToString(uint64(p)) + case float64: + retv = FloatToString(float64(p)) + default: + return retv, fmt.Errorf("unexpected type: %T, with value: %v, for: %s", v, v, name) + } + return retv, nil +} + +func IntToString(input_num int64) string { + return strconv.FormatInt(input_num, 10) +} + +func UIntToString(input_num uint64) string { + return strconv.FormatUint(input_num, 10) +} + +func FloatToString(input_num float64) string { + return strconv.FormatFloat(input_num, 'f', 6, 64) +} + +func (w *Wavefront) SampleConfig() string { + return sampleConfig +} + +func (w *Wavefront) Description() string { + return "Configuration for Wavefront server to send metrics to" +} + +func (w *Wavefront) Close() error { + return nil +} + +func init() { + outputs.Add("wavefront", func() telegraf.Output { + return &Wavefront{ + MetricSeparator: ".", + ConvertPaths: true, + } + }) +} diff --git a/plugins/outputs/wavefront/wavefront_test.go b/plugins/outputs/wavefront/wavefront_test.go new file mode 100644 index 0000000000000..45777713eda68 --- /dev/null +++ b/plugins/outputs/wavefront/wavefront_test.go @@ -0,0 +1,211 @@ +package wavefront + +import ( + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/metric" + "github.com/influxdata/telegraf/testutil" + "reflect" + "strings" + "testing" + "time" +) + +func defaultWavefront() *Wavefront { + return &Wavefront{ + Host: "localhost", + Port: 2878, + Prefix: "testWF.", + SimpleFields: false, + MetricSeparator: ".", + ConvertPaths: true, + UseRegex: false, + } +} + +func TestSourceTags(t *testing.T) { + w := defaultWavefront() + w.SourceOverride = []string{"snmp_host", "hostagent"} + + var tagtests = []struct { + ptIn map[string]string + outTags []string + }{ + { + map[string]string{"snmp_host": "realHost", "host": "origHost"}, + []string{"source=\"realHost\"", "telegraf_host=\"origHost\""}, + }, + { + map[string]string{"hostagent": "realHost", "host": "origHost"}, + []string{"source=\"realHost\"", "telegraf_host=\"origHost\""}, + }, + { + map[string]string{"hostagent": "abc", "snmp_host": "realHost", "host": "origHost"}, + []string{"hostagent=\"abc\"", "source=\"realHost\"", "telegraf_host=\"origHost\""}, + }, + { + map[string]string{"something": "abc", "host": "realHost"}, + []string{"something=\"abc\"", "source=\"realHost\""}, + }, + } + for _, tt := range tagtests { + tags := buildTags(tt.ptIn, w) + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags) + } + } +} + +func TestBuildMetricsNoSimpleFields(t *testing.T) { + w := defaultWavefront() + w.UseRegex = false + w.Prefix = "testthis." + w.SimpleFields = false + + pathReplacer = strings.NewReplacer("_", w.MetricSeparator) + + testMetric1, _ := metric.New( + "test.simple.metric", + map[string]string{"tag1": "value1"}, + map[string]interface{}{"value": 123}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + var metricTests = []struct { + metric telegraf.Metric + metricLines []MetricLine + }{ + { + testutil.TestMetric(float64(1.0), "testing_just*a%metric:float"), + []MetricLine{{Metric: w.Prefix + "testing.just-a-metric-float", Value: "1.000000"}}, + }, + { + testMetric1, + []MetricLine{{Metric: w.Prefix + "test.simple.metric", Value: "123"}}, + }, + } + + for _, mt := range metricTests { + ml := buildMetrics(mt.metric, w) + for i, line := range ml { + if mt.metricLines[i].Metric != line.Metric || mt.metricLines[i].Value != line.Value { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", mt.metricLines[i].Metric+" "+mt.metricLines[i].Value, line.Metric+" "+line.Value) + } + } + } + +} + +func TestBuildMetricsWithSimpleFields(t *testing.T) { + w := defaultWavefront() + w.UseRegex = false + w.Prefix = "testthis." + w.SimpleFields = true + + pathReplacer = strings.NewReplacer("_", w.MetricSeparator) + + testMetric1, _ := metric.New( + "test.simple.metric", + map[string]string{"tag1": "value1"}, + map[string]interface{}{"value": 123}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ) + + var metricTests = []struct { + metric telegraf.Metric + metricLines []MetricLine + }{ + { + testutil.TestMetric(float64(1.0), "testing_just*a%metric:float"), + []MetricLine{{Metric: w.Prefix + "testing.just-a-metric-float.value", Value: "1.000000"}}, + }, + { + testMetric1, + []MetricLine{{Metric: w.Prefix + "test.simple.metric.value", Value: "123"}}, + }, + } + + for _, mt := range metricTests { + ml := buildMetrics(mt.metric, w) + for i, line := range ml { + if mt.metricLines[i].Metric != line.Metric || mt.metricLines[i].Value != line.Value { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", mt.metricLines[i].Metric+" "+mt.metricLines[i].Value, line.Metric+" "+line.Value) + } + } + } + +} + +func TestBuildTags(t *testing.T) { + + w := defaultWavefront() + + var tagtests = []struct { + ptIn map[string]string + outTags []string + }{ + { + map[string]string{"one": "two", "three": "four", "host": "testHost"}, + []string{"one=\"two\"", "source=\"testHost\"", "three=\"four\""}, + }, + { + map[string]string{"aaa": "bbb", "host": "testHost"}, + []string{"aaa=\"bbb\"", "source=\"testHost\""}, + }, + { + map[string]string{"bbb": "789", "aaa": "123", "host": "testHost"}, + []string{"aaa=\"123\"", "bbb=\"789\"", "source=\"testHost\""}, + }, + { + map[string]string{"host": "aaa", "dc": "bbb"}, + []string{"dc=\"bbb\"", "source=\"aaa\""}, + }, + { + map[string]string{"Sp%ci@l Chars": "\"g*t repl#ced", "host": "testHost"}, + []string{"Sp-ci-l-Chars=\"\\\"g-t repl#ced\"", "source=\"testHost\""}, + }, + } + for _, tt := range tagtests { + tags := buildTags(tt.ptIn, w) + if !reflect.DeepEqual(tags, tt.outTags) { + t.Errorf("\nexpected\t%+v\nreceived\t%+v\n", tt.outTags, tags) + } + } +} + +// func TestWrite(t *testing.T) { +// if testing.Short() { +// t.Skip("Skipping integration test in short mode") +// } + +// w := &Wavefront{ +// Host: testutil.GetLocalHost(), +// Port: 2878, +// Prefix: "prefix.test.", +// } + +// // Verify that we can connect to the Wavefront instance +// err := w.Connect() +// require.NoError(t, err) + +// // Verify that we can successfully write data to Wavefront +// err = w.Write(testutil.MockMetrics()) +// require.NoError(t, err) + +// // Verify postive and negative test cases of writing data +// metrics := testutil.MockMetrics() +// metrics = append(metrics, testutil.TestMetric(float64(1.0), +// "justametric.float")) +// metrics = append(metrics, testutil.TestMetric(int64(123456789), +// "justametric.int")) +// metrics = append(metrics, testutil.TestMetric(uint64(123456789012345), +// "justametric.uint")) +// metrics = append(metrics, testutil.TestMetric("Lorem Ipsum", +// "justametric.string")) +// metrics = append(metrics, testutil.TestMetric(float64(42.0), +// "justametric.anotherfloat")) +// metrics = append(metrics, testutil.TestMetric(float64(42.0), +// "metric w/ specialchars")) + +// err = w.Write(metrics) +// require.NoError(t, err) +// }