From 2fa033357120139896e541fd5a0a78fd99bdd6d2 Mon Sep 17 00:00:00 2001 From: JP Date: Mon, 26 Oct 2015 13:31:21 -0500 Subject: [PATCH] add librato output plugin, update datadog plugin to skip non-number metrics --- outputs/all/all.go | 1 + outputs/datadog/README.md | 7 + outputs/datadog/datadog.go | 16 ++- outputs/librato/README.md | 10 ++ outputs/librato/librato.go | 165 +++++++++++++++++++++ outputs/librato/librato_test.go | 245 ++++++++++++++++++++++++++++++++ 6 files changed, 439 insertions(+), 5 deletions(-) create mode 100644 outputs/datadog/README.md create mode 100644 outputs/librato/README.md create mode 100644 outputs/librato/librato.go create mode 100644 outputs/librato/librato_test.go diff --git a/outputs/all/all.go b/outputs/all/all.go index 6538af0d2a71f..4967cf8506930 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -5,6 +5,7 @@ import ( _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" _ "github.com/influxdb/telegraf/outputs/kafka" + _ "github.com/influxdb/telegraf/outputs/librato" _ "github.com/influxdb/telegraf/outputs/mqtt" _ "github.com/influxdb/telegraf/outputs/opentsdb" ) diff --git a/outputs/datadog/README.md b/outputs/datadog/README.md new file mode 100644 index 0000000000000..6050a6e3b2058 --- /dev/null +++ b/outputs/datadog/README.md @@ -0,0 +1,7 @@ +# Datadog Output Plugin + +This plugin writes to the [Datadog Metrics API](http://docs.datadoghq.com/api/#metrics). + +If the point value being sent cannot be converted to a float64, the metric is skipped. + +Metrics are grouped by converting any `_` characters to `.` in the Point Name. \ No newline at end of file diff --git a/outputs/datadog/datadog.go b/outputs/datadog/datadog.go index 7b87a43e159a2..338861b45b3a9 100644 --- a/outputs/datadog/datadog.go +++ b/outputs/datadog/datadog.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "log" "net/http" "net/url" "sort" @@ -65,10 +66,10 @@ func (d *Datadog) Write(points []*client.Point) error { if len(points) == 0 { return nil } - ts := TimeSeries{ - Series: make([]*Metric, len(points)), - } - for index, pt := range points { + ts := TimeSeries{} + var tempSeries = make([]*Metric, len(points)) + var acceptablePoints = 0 + for _, pt := range points { metric := &Metric{ Metric: strings.Replace(pt.Name(), "_", ".", -1), Tags: buildTags(pt.Tags()), @@ -76,9 +77,14 @@ func (d *Datadog) Write(points []*client.Point) error { } if p, err := buildPoint(pt); err == nil { metric.Points[0] = p + tempSeries[acceptablePoints] = metric + acceptablePoints += 1 + } else { + log.Printf("unable to build Metric for %s, skipping\n", pt.Name()) } - ts.Series[index] = metric } + ts.Series = make([]*Metric, acceptablePoints) + copy(ts.Series, tempSeries[0:]) tsBytes, err := json.Marshal(ts) if err != nil { return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error()) diff --git a/outputs/librato/README.md b/outputs/librato/README.md new file mode 100644 index 0000000000000..6f24518efa9bb --- /dev/null +++ b/outputs/librato/README.md @@ -0,0 +1,10 @@ +# Librato Output Plugin + +This plugin writes to the [Librato Metrics API](http://dev.librato.com/v1/metrics#metrics). + +The `source_tag` option in the Configuration file is used to send contextual information from +Point Tags to the API. + +If the point value being sent cannot be converted to a float64, the metric is skipped. + +Currently, the plugin does not send any associated Point Tags. \ No newline at end of file diff --git a/outputs/librato/librato.go b/outputs/librato/librato.go new file mode 100644 index 0000000000000..fcb7cb2a70122 --- /dev/null +++ b/outputs/librato/librato.go @@ -0,0 +1,165 @@ +package librato + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/duration" + "github.com/influxdb/telegraf/outputs" +) + +type Librato struct { + User string + Token string + SourceTag string + Timeout duration.Duration + + apiUrl string + client *http.Client +} + +var sampleConfig = ` + # Librator API Docs + # http://dev.librato.com/v1/metrics-authentication + + # Librato API user + token = "my-secret-token" # required. + + # Librato API token + token = "my-secret-token" # required. + + # Tag Field to populate source attribute (optional) + # This is typically the _hostname_ from which the metric was obtained. + source_tag = "hostname" + + # Connection timeout. + # timeout = "5s" +` + +type Metrics struct { + Gauges []*Gauge `json:"gauges"` +} + +type Gauge struct { + Name string `json:"name"` + Value float64 `json:"value"` + Source string `json:"source"` + MeasureTime int64 `json:"measure_time"` +} + +const librato_api = "https://metrics-api.librato.com/v1/metrics" + +func NewLibrato(apiUrl string) *Librato { + return &Librato{ + apiUrl: apiUrl, + } +} + +func (l *Librato) Connect() error { + if l.User == "" || l.Token == "" { + return fmt.Errorf("user and token are required fields for librato output") + } + l.client = &http.Client{ + Timeout: l.Timeout.Duration, + } + return nil +} + +func (l *Librato) Write(points []*client.Point) error { + if len(points) == 0 { + return nil + } + metrics := Metrics{} + var tempGauges = make([]*Gauge, len(points)) + var acceptablePoints = 0 + for _, pt := range points { + if gauge, err := l.buildGauge(pt); err == nil { + tempGauges[acceptablePoints] = gauge + acceptablePoints += 1 + } else { + log.Printf("unable to build Gauge for %s, skipping\n", pt.Name()) + } + } + metrics.Gauges = make([]*Gauge, acceptablePoints) + copy(metrics.Gauges, tempGauges[0:]) + metricsBytes, err := json.Marshal(metrics) + if err != nil { + return fmt.Errorf("unable to marshal Metrics, %s\n", err.Error()) + } + req, err := http.NewRequest("POST", l.apiUrl, bytes.NewBuffer(metricsBytes)) + if err != nil { + return fmt.Errorf("unable to create http.Request, %s\n", err.Error()) + } + req.Header.Add("Content-Type", "application/json") + req.SetBasicAuth(l.User, l.Token) + + resp, err := l.client.Do(req) + if err != nil { + return fmt.Errorf("error POSTing metrics, %s\n", err.Error()) + } + defer resp.Body.Close() + + if resp.StatusCode != 200 { + return fmt.Errorf("received bad status code, %d\n", resp.StatusCode) + } + + return nil +} + +func (l *Librato) SampleConfig() string { + return sampleConfig +} + +func (l *Librato) Description() string { + return "Configuration for Librato API to send metrics to." +} + +func (l *Librato) buildGauge(pt *client.Point) (*Gauge, error) { + gauge := &Gauge{ + Name: pt.Name(), + MeasureTime: pt.Time().Unix(), + } + if err := gauge.setValue(pt.Fields()["value"]); err != nil { + return gauge, fmt.Errorf("unable to extract value from Fields, %s\n", err.Error()) + } + if l.SourceTag != "" { + if source, ok := pt.Tags()[l.SourceTag]; ok { + gauge.Source = source + } else { + return gauge, fmt.Errorf("undeterminable Source type from Field, %s\n", l.SourceTag) + } + } + return gauge, nil +} + +func (g *Gauge) setValue(v interface{}) error { + switch d := v.(type) { + case int: + g.Value = float64(int(d)) + case int32: + g.Value = float64(int32(d)) + case int64: + g.Value = float64(int64(d)) + case float32: + g.Value = float64(d) + case float64: + g.Value = float64(d) + default: + return fmt.Errorf("undeterminable type %+v", d) + } + return nil +} + +func (l *Librato) Close() error { + return nil +} + +func init() { + outputs.Add("librato", func() outputs.Output { + return NewLibrato(librato_api) + }) +} diff --git a/outputs/librato/librato_test.go b/outputs/librato/librato_test.go new file mode 100644 index 0000000000000..63c74f6b25055 --- /dev/null +++ b/outputs/librato/librato_test.go @@ -0,0 +1,245 @@ +package librato + +import ( + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "reflect" + "testing" + "time" + + "github.com/influxdb/telegraf/testutil" + + "github.com/influxdb/influxdb/client/v2" + "github.com/stretchr/testify/require" +) + +var ( + fakeUrl = "http://test.librato.com" + fakeUser = "telegraf@influxdb.com" + fakeToken = "123456" +) + +func fakeLibrato() *Librato { + l := NewLibrato(fakeUrl) + l.User = fakeUser + l.Token = fakeToken + return l +} + +func TestUriOverride(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + + l := NewLibrato(ts.URL) + l.User = "telegraf@influxdb.com" + l.Token = "123456" + err := l.Connect() + require.NoError(t, err) + err = l.Write(testutil.MockBatchPoints().Points()) + require.NoError(t, err) +} + +func TestBadStatusCode(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(`{ + "errors": { + "system": [ + "The API is currently down for maintenance. It'll be back shortly." + ] + } + }`) + })) + defer ts.Close() + + l := NewLibrato(ts.URL) + l.User = "telegraf@influxdb.com" + l.Token = "123456" + err := l.Connect() + require.NoError(t, err) + err = l.Write(testutil.MockBatchPoints().Points()) + if err == nil { + t.Errorf("error expected but none returned") + } else { + require.EqualError(t, fmt.Errorf("received bad status code, 503\n"), err.Error()) + } +} + +func TestBuildGauge(t *testing.T) { + tags := make(map[string]string) + var gaugeTests = []struct { + ptIn *client.Point + outGauge *Gauge + err error + }{ + { + client.NewPoint( + "test1", + tags, + map[string]interface{}{"value": 0.0}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test1", + MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 0.0, + }, + nil, + }, + { + client.NewPoint( + "test2", + tags, + map[string]interface{}{"value": 1.0}, + time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test2", + MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 1.0, + }, + nil, + }, + { + client.NewPoint( + "test3", + tags, + map[string]interface{}{"value": 10}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test3", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 10.0, + }, + nil, + }, + { + client.NewPoint( + "test4", + tags, + map[string]interface{}{"value": int32(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test4", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 112345.0, + }, + nil, + }, + { + client.NewPoint( + "test5", + tags, + map[string]interface{}{"value": int64(112345)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test5", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 112345.0, + }, + nil, + }, + { + client.NewPoint( + "test6", + tags, + map[string]interface{}{"value": float32(11234.5)}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test6", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 11234.5, + }, + nil, + }, + { + client.NewPoint( + "test7", + tags, + map[string]interface{}{"value": "11234.5"}, + time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test7", + MeasureTime: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 11234.5, + }, + fmt.Errorf("unable to extract value from Fields, undeterminable type"), + }, + } + + l := NewLibrato(fakeUrl) + for _, gt := range gaugeTests { + gauge, err := l.buildGauge(gt.ptIn) + if err != nil && gt.err == nil { + t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err) + } + if gt.err != nil && err == nil { + t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error()) + } + if !reflect.DeepEqual(gauge, gt.outGauge) && gt.err == nil { + t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauge) + } + } +} + +func TestBuildGaugeWithSource(t *testing.T) { + var gaugeTests = []struct { + ptIn *client.Point + outGauge *Gauge + err error + }{ + { + client.NewPoint( + "test1", + map[string]string{"hostname": "192.168.0.1"}, + map[string]interface{}{"value": 0.0}, + time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test1", + MeasureTime: time.Date(2010, time.November, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 0.0, + Source: "192.168.0.1", + }, + nil, + }, + { + client.NewPoint( + "test2", + map[string]string{"hostnam": "192.168.0.1"}, + map[string]interface{}{"value": 1.0}, + time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC), + ), + &Gauge{ + Name: "test2", + MeasureTime: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix(), + Value: 1.0, + }, + fmt.Errorf("undeterminable Source type from Field, hostname"), + }, + } + + l := NewLibrato(fakeUrl) + l.SourceTag = "hostname" + for _, gt := range gaugeTests { + gauge, err := l.buildGauge(gt.ptIn) + if err != nil && gt.err == nil { + t.Errorf("%s: unexpected error, %+v\n", gt.ptIn.Name(), err) + } + if gt.err != nil && err == nil { + t.Errorf("%s: expected an error (%s) but none returned", gt.ptIn.Name(), gt.err.Error()) + } + if !reflect.DeepEqual(gauge, gt.outGauge) && gt.err == nil { + t.Errorf("%s: \nexpected %+v\ngot %+v\n", gt.ptIn.Name(), gt.outGauge, gauge) + } + } +}