Skip to content

Commit

Permalink
Merge branch 'jipperinbham-datadog-output'
Browse files Browse the repository at this point in the history
  • Loading branch information
sparrc committed Aug 14, 2015
2 parents c26fa33 + 0ae5075 commit cad0a76
Show file tree
Hide file tree
Showing 4 changed files with 361 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## v0.1.6 [unreleased]

### Features
[#112](://github.com/influxdb/telegraf/pull/112): Datadog output. Thanks @jipperinbham!

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions outputs/all/all.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package all

import (
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
)
155 changes: 155 additions & 0 deletions outputs/datadog/datadog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package datadog

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sort"

"github.com/influxdb/influxdb/client"
t "github.com/influxdb/telegraf"
"github.com/influxdb/telegraf/outputs"
)

type Datadog struct {
Apikey string
Timeout t.Duration

apiUrl string
client *http.Client
}

type TimeSeries struct {
Series []*Metric `json:"series"`
}

type Metric struct {
Metric string `json:"metric"`
Points [1]Point `json:"points"`
Tags []string `json:"tags,omitempty"`
}

type Point [2]float64

const datadog_api = "https://app.datadoghq.com/api/v1/series"

func NewDatadog(apiUrl string) *Datadog {
return &Datadog{
apiUrl: apiUrl,
}
}

func (d *Datadog) Connect() error {
if d.Apikey == "" {
return fmt.Errorf("apikey is a required field for datadog output")
}
d.client = &http.Client{
Timeout: d.Timeout.Duration,
}
return nil
}

func (d *Datadog) Write(bp client.BatchPoints) error {
if len(bp.Points) == 0 {
return nil
}
ts := TimeSeries{
Series: make([]*Metric, len(bp.Points)),
}
for index, pt := range bp.Points {
metric := &Metric{
Metric: pt.Measurement,
Tags: buildTags(bp.Tags, pt.Tags),
}
if p, err := buildPoint(bp, pt); err == nil {
metric.Points[0] = p
}
ts.Series[index] = metric
}
tsBytes, err := json.Marshal(ts)
if err != nil {
return fmt.Errorf("unable to marshal TimeSeries, %s\n", err.Error())
}
req, err := http.NewRequest("POST", d.authenticatedUrl(), bytes.NewBuffer(tsBytes))
if err != nil {
return fmt.Errorf("unable to create http.Request, %s\n", err.Error())
}
req.Header.Add("Content-Type", "application/json")

resp, err := d.client.Do(req)
defer resp.Body.Close()
if err != nil {
return fmt.Errorf("error POSTing metrics, %s\n", err.Error())
}

if resp.StatusCode < 200 || resp.StatusCode > 209 {
return fmt.Errorf("received bad status code, %d\n", resp.StatusCode)
}

return nil
}

func (d *Datadog) authenticatedUrl() string {
q := url.Values{
"api_key": []string{d.Apikey},
}
return fmt.Sprintf("%s?%s", d.apiUrl, q.Encode())
}

func buildTags(bpTags map[string]string, ptTags map[string]string) []string {
tags := make([]string, (len(bpTags) + len(ptTags)))
index := 0
for k, v := range bpTags {
tags[index] = fmt.Sprintf("%s:%s", k, v)
index += 1
}
for k, v := range ptTags {
tags[index] = fmt.Sprintf("%s:%s", k, v)
index += 1
}
sort.Strings(tags)
return tags
}

func buildPoint(bp client.BatchPoints, pt client.Point) (Point, error) {
var p Point
if err := p.setValue(pt.Fields["value"]); err != nil {
return p, fmt.Errorf("unable to extract value from Fields, %s", err.Error())
}
if pt.Time.IsZero() {
p[0] = float64(bp.Time.Unix())
} else {
p[0] = float64(pt.Time.Unix())
}
return p, nil
}

func (p *Point) setValue(v interface{}) error {
switch d := v.(type) {
case int:
p[1] = float64(int(d))
case int32:
p[1] = float64(int32(d))
case int64:
p[1] = float64(int64(d))
case float32:
p[1] = float64(d)
case float64:
p[1] = float64(d)
default:
return fmt.Errorf("undeterminable type")
}
return nil
}

func (d *Datadog) Close() error {
return nil
}

func init() {
outputs.Add("datadog", func() outputs.Output {
return NewDatadog(datadog_api)
})
}
204 changes: 204 additions & 0 deletions outputs/datadog/datadog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
package datadog

import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/influxdb/influxdb/client"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
fakeUrl = "http://test.datadog.com"
fakeApiKey = "123456"
)

func fakeDatadog() *Datadog {
d := NewDatadog(fakeUrl)
d.Apikey = fakeApiKey
return d
}

func testData() client.BatchPoints {
var bp client.BatchPoints
bp.Time = time.Now()
bp.Tags = map[string]string{"tag1": "value1"}
bp.Points = []client.Point{
{
Fields: map[string]interface{}{"value": 1.0},
},
}
return bp
}

func TestUriOverride(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(`{"status":"ok"}`)
}))
defer ts.Close()

d := NewDatadog(ts.URL)
d.Apikey = "123456"
err := d.Connect()
require.NoError(t, err)
err = d.Write(testData())
require.NoError(t, err)
}

func TestBadStatusCode(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(`{ 'errors': [
'Something bad happened to the server.',
'Your query made the server very sad.'
]
}`)
}))
defer ts.Close()

d := NewDatadog(ts.URL)
d.Apikey = "123456"
err := d.Connect()
require.NoError(t, err)
err = d.Write(testData())
if err == nil {
t.Errorf("error expected but none returned")
} else {
require.EqualError(t, fmt.Errorf("received bad status code, 500\n"), err.Error())
}
}

func TestAuthenticatedUrl(t *testing.T) {
d := fakeDatadog()

authUrl := d.authenticatedUrl()
assert.EqualValues(t, fmt.Sprintf("%s?api_key=%s", fakeUrl, fakeApiKey), authUrl)
}

func TestBuildTags(t *testing.T) {
var tagtests = []struct {
bpIn map[string]string
ptIn map[string]string
outTags []string
}{
{
map[string]string{"one": "two"},
map[string]string{"three": "four"},
[]string{"one:two", "three:four"},
},
{
map[string]string{"aaa": "bbb"},
map[string]string{},
[]string{"aaa:bbb"},
},
{
map[string]string{},
map[string]string{},
[]string{},
},
}
for _, tt := range tagtests {
tags := buildTags(tt.bpIn, tt.ptIn)
if !reflect.DeepEqual(tags, tt.outTags) {
t.Errorf("\nexpected %+v\ngot %+v\n", tt.outTags, tags)
}
}
}

func TestBuildPoint(t *testing.T) {
var tagtests = []struct {
bpIn client.BatchPoints
ptIn client.Point
outPt Point
err error
}{
{
client.BatchPoints{
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
client.Point{
Fields: map[string]interface{}{"value": 0.0},
},
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 0.0},
nil,
},
{
client.BatchPoints{},
client.Point{
Fields: map[string]interface{}{"value": 1.0},
Time: time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC),
},
Point{float64(time.Date(2010, time.December, 10, 23, 0, 0, 0, time.UTC).Unix()), 1.0},
nil,
},
{
client.BatchPoints{
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
client.Point{
Fields: map[string]interface{}{"value": 10},
},
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 10.0},
nil,
},
{
client.BatchPoints{
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
client.Point{
Fields: map[string]interface{}{"value": int32(112345)},
},
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0},
nil,
},
{
client.BatchPoints{
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
client.Point{
Fields: map[string]interface{}{"value": int64(112345)},
},
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 112345.0},
nil,
},
{
client.BatchPoints{
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
client.Point{
Fields: map[string]interface{}{"value": float32(11234.5)},
},
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5},
nil,
},
{
client.BatchPoints{
Time: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
},
client.Point{
Fields: map[string]interface{}{"value": "11234.5"},
},
Point{float64(time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC).Unix()), 11234.5},
fmt.Errorf("unable to extract value from Fields, undeterminable type"),
},
}
for _, tt := range tagtests {
pt, err := buildPoint(tt.bpIn, tt.ptIn)
if err != nil && tt.err == nil {
t.Errorf("unexpected error, %+v\n", err)
}
if tt.err != nil && err == nil {
t.Errorf("expected an error (%s) but none returned", tt.err.Error())
}
if !reflect.DeepEqual(pt, tt.outPt) && tt.err == nil {
t.Errorf("\nexpected %+v\ngot %+v\n", tt.outPt, pt)
}
}
}

0 comments on commit cad0a76

Please sign in to comment.