Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add datadog metrics provider #460

Merged
merged 1 commit into from
Feb 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions artifacts/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ spec:
enum:
- prometheus
- influxdb
- datadog
address:
description: API address of this provider
type: string
Expand Down
1 change: 1 addition & 0 deletions charts/flagger/crds/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ spec:
enum:
- prometheus
- influxdb
- datadog
address:
description: API address of this provider
type: string
Expand Down
1 change: 1 addition & 0 deletions kustomize/base/flagger/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ spec:
enum:
- prometheus
- influxdb
- datadog
address:
description: API address of this provider
type: string
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,7 @@ func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool {
}

factory := providers.Factory{}
provider, err := factory.Provider(template.Spec.Provider, credentials)
provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials)
if err != nil {
c.recordEventErrorf(canary, "Metric template %s.%s provider %s error: %v",
metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err)
Expand Down
168 changes: 168 additions & 0 deletions pkg/metrics/providers/datadog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package providers

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"time"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
)

// https://docs.datadoghq.com/api/
const (
datadogDefaultHost = "https://api.datadoghq.com"

datadogMetricsQueryPath = "/api/v1/query"
datadogAPIKeyValidationPath = "/api/v1/validate"

datadogAPIKeySecretKey = "datadog_api_key"
datadogAPIKeyHeaderKey = "DD-API-KEY"

datadogApplicationKeySecretKey = "datadog_application_key"
datadogApplicationKeyHeaderKey = "DD-APPLICATION-KEY"

datadogFromDeltaMultiplierOnMetricInterval = 10
)

// DatadogProvider executes datadog queries
type DatadogProvider struct {
metricsQueryEndpoint string
apiKeyValidationEndpoint string

timeout time.Duration
apiKey string
applicationKey string
fromDelta int64
}

type datadogResponse struct {
Series []struct {
Pointlist [][]float64 `json:"pointlist"`
}
}

// NewDatadogProvider takes a canary spec, a provider spec and the credentials map, and
// returns a Datadog client ready to execute queries against the API
func NewDatadogProvider(metricInterval string,
provider flaggerv1.MetricTemplateProvider,
credentials map[string][]byte) (*DatadogProvider, error) {

address := provider.Address
if address == "" {
address = datadogDefaultHost
}

dd := DatadogProvider{
timeout: 5 * time.Second,
metricsQueryEndpoint: address + datadogMetricsQueryPath,
apiKeyValidationEndpoint: address + datadogAPIKeyValidationPath,
}

if b, ok := credentials[datadogAPIKeySecretKey]; ok {
dd.apiKey = string(b)
} else {
return nil, fmt.Errorf("datadog credentials does not contain datadog_api_key")
}

if b, ok := credentials[datadogApplicationKeySecretKey]; ok {
dd.applicationKey = string(b)
} else {
return nil, fmt.Errorf("datadog credentials does not contain datadog_application_key")
}

md, err := time.ParseDuration(metricInterval)
if err != nil {
return nil, fmt.Errorf("error parsing metric interval: %s", err.Error())
}

dd.fromDelta = int64(datadogFromDeltaMultiplierOnMetricInterval * md.Seconds())
return &dd, nil
}

// RunQuery executes the datadog query against DatadogProvider.metricsQueryEndpoint
// and returns the the first result as float64
func (p *DatadogProvider) RunQuery(query string) (float64, error) {

req, err := http.NewRequest("GET", p.metricsQueryEndpoint, nil)
if err != nil {
return 0, fmt.Errorf("error http.NewRequest: %s", err.Error())
}

req.Header.Set(datadogAPIKeyHeaderKey, p.apiKey)
req.Header.Set(datadogApplicationKeyHeaderKey, p.applicationKey)
now := time.Now().Unix()
q := req.URL.Query()
q.Add("query", query)
q.Add("from", strconv.FormatInt(now-p.fromDelta, 10))
q.Add("to", strconv.FormatInt(now, 10))
req.URL.RawQuery = q.Encode()

ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
defer cancel()
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return 0, err
}

defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return 0, fmt.Errorf("error reading body: %s", err.Error())
}

if r.StatusCode != http.StatusOK {
return 0, fmt.Errorf("error response: %s", string(b))
}

var res datadogResponse
if err := json.Unmarshal(b, &res); err != nil {
return 0, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b))
}

if len(res.Series) < 1 {
return 0, fmt.Errorf("no values found in response: %s", string(b))
}

s := res.Series[0]
vs := s.Pointlist[len(s.Pointlist)-1]
if len(vs) < 1 {
return 0, fmt.Errorf("no values found in response: %s", string(b))
}

return vs[1], nil
}

// IsOnline calls the Datadog's validation endpoint with api keys
// and returns an error if the validation fails
func (p *DatadogProvider) IsOnline() (bool, error) {
req, err := http.NewRequest("GET", p.apiKeyValidationEndpoint, nil)
if err != nil {
return false, fmt.Errorf("error http.NewRequest: %s", err.Error())
}

req.Header.Add(datadogAPIKeyHeaderKey, p.apiKey)
req.Header.Add(datadogApplicationKeyHeaderKey, p.applicationKey)

ctx, cancel := context.WithTimeout(req.Context(), p.timeout)
defer cancel()
r, err := http.DefaultClient.Do(req.WithContext(ctx))
if err != nil {
return false, err
}
defer r.Body.Close()

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %s", err.Error())
}

if r.StatusCode != http.StatusOK {
return false, fmt.Errorf("error response: %s", string(b))
}

return true, nil
}
156 changes: 156 additions & 0 deletions pkg/metrics/providers/datadog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package providers

import (
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
)

func TestNewDatadogProvider(t *testing.T) {
appKey := "app-key"
apiKey := "api-key"
cs := map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
}

mi := "100s"
md, err := time.ParseDuration(mi)
if err != nil {
t.Fatal(err)
}

dp, err := NewDatadogProvider("100s", flaggerv1.MetricTemplateProvider{}, cs)

if err != nil {
t.Fatal(err)
}

if exp := "https://api.datadoghq.com/api/v1/validate"; dp.apiKeyValidationEndpoint != exp {
t.Fatalf("apiKeyValidationEndpoint expected %s but got %s", exp, dp.apiKeyValidationEndpoint)
}

if exp := "https://api.datadoghq.com/api/v1/query"; dp.metricsQueryEndpoint != exp {
t.Fatalf("metricsQueryEndpoint expected %s but got %s", exp, dp.metricsQueryEndpoint)
}

if exp := int64(md.Seconds() * datadogFromDeltaMultiplierOnMetricInterval); dp.fromDelta != exp {
t.Fatalf("fromDelta expected %d but got %d", exp, dp.fromDelta)
}

if dp.applicationKey != appKey {
t.Fatalf("application key expected %s but got %s", appKey, dp.applicationKey)
}

if dp.apiKey != apiKey {
t.Fatalf("api key expected %s but got %s", apiKey, dp.apiKey)
}
}

func TestDatadogProvider_RunQuery(t *testing.T) {
eq := `avg:system.cpu.user\{*}by{host}`
appKey := "app-key"
apiKey := "api-key"
expected := 1.11111

now := time.Now().Unix()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aq := r.URL.Query().Get("query")
if aq != eq {
t.Errorf("\nquery expected %s bug got %s", eq, aq)
}

if vs := r.Header.Get(datadogApplicationKeyHeaderKey); vs != appKey {
t.Errorf("\n%s header expected %s but got %s", datadogApplicationKeyHeaderKey, appKey, vs)
}
if vs := r.Header.Get(datadogAPIKeyHeaderKey); vs != apiKey {
t.Errorf("\n%s header expected %s but got %s", datadogAPIKeyHeaderKey, apiKey, vs)
}

rf := r.URL.Query().Get("from")
if from, err := strconv.ParseInt(rf, 10, 64); err == nil && from >= now {
t.Errorf("\nfrom %d should be less than %d", from, now)
} else if err != nil {
t.Errorf("\nfailed to parse from: %v", err)
}

rt := r.URL.Query().Get("to")
if to, err := strconv.ParseInt(rt, 10, 64); err == nil && to < now {
t.Errorf("\nto %d should be greater than or equals %d", to, now)
} else if err != nil {
t.Errorf("\nfailed to parse to: %v", err)
}

json := fmt.Sprintf(`{"series": [{"pointlist": [[1577232000000,29325.102158814265],[1577318400000,56294.46758591842],[1577404800000,%f]]}]}`, expected)
w.Write([]byte(json))
}))
defer ts.Close()

dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
if err != nil {
t.Fatal(err)
}

f, err := dp.RunQuery(eq)
if err != nil {
t.Fatal(err)
}

if f != expected {
t.Fatalf("metric value expected %f but got %f", expected, f)
}
}

func TestDatadogProvider_IsOnline(t *testing.T) {
for _, c := range []struct {
code int
errExpected bool
}{
{code: http.StatusOK, errExpected: false},
{code: http.StatusUnauthorized, errExpected: true},
} {
t.Run(fmt.Sprintf("%d", c.code), func(t *testing.T) {
appKey := "app-key"
apiKey := "api-key"
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if vs := r.Header.Get(datadogApplicationKeyHeaderKey); vs != appKey {
t.Errorf("\n%s header expected %s but got %s", datadogApplicationKeyHeaderKey, appKey, vs)
}
if vs := r.Header.Get(datadogAPIKeyHeaderKey); vs != apiKey {
t.Errorf("\n%s header expected %s but got %s", datadogAPIKeyHeaderKey, apiKey, vs)
}
w.WriteHeader(c.code)
}))
defer ts.Close()

dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
if err != nil {
t.Fatal(err)
}

_, err = dp.IsOnline()
if c.errExpected && err == nil {
t.Fatal("error expected but got no error")
} else if !c.errExpected && err != nil {
t.Fatalf("no error expected but got %v", err)
}
})
}
}
16 changes: 12 additions & 4 deletions pkg/metrics/providers/factory.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
package providers

import flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
import (
flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1"
)

type Factory struct {
}
type Factory struct{}

func (factory Factory) Provider(
metricInterval string,
provider flaggerv1.MetricTemplateProvider,
credentials map[string][]byte,
) (Interface, error) {

func (factory Factory) Provider(provider flaggerv1.MetricTemplateProvider, credentials map[string][]byte) (Interface, error) {
switch {
case provider.Type == "prometheus":
return NewPrometheusProvider(provider, credentials)
case provider.Type == "datadog":
return NewDatadogProvider(metricInterval, provider, credentials)
default:
return NewPrometheusProvider(provider, credentials)
}
Expand Down