Skip to content

Commit

Permalink
pkg/metrics/providers: wrap ErrNoValuesFound modify controller accord…
Browse files Browse the repository at this point in the history
…ingly
  • Loading branch information
mathetake committed Mar 7, 2020
1 parent 7fb675e commit 9196e87
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 102 deletions.
4 changes: 2 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (c *Controller) processNextWorkItem() bool {
// Run the syncHandler, passing it the namespace/name string of the
// Foo resource to be synced.
if err := c.syncHandler(key); err != nil {
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
return fmt.Errorf("error syncing '%s': %w", key, err)
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
Expand Down Expand Up @@ -230,7 +230,7 @@ func (c *Controller) syncHandler(key string) error {
_, err := c.flaggerClient.FlaggerV1beta1().Canaries(cd.Namespace).UpdateStatus(cdCopy)
if err != nil {
c.logger.Errorf("%s status condition update error: %v", key, err)
return fmt.Errorf("%s status condition update error: %v", key, err)
return fmt.Errorf("%s status condition update error: %w", key, err)
}
}
}
Expand Down
14 changes: 6 additions & 8 deletions pkg/controller/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@ func (c *Controller) recordEventWarningf(r *flaggerv1.Canary, template string, a

func (c *Controller) sendEventToWebhook(r *flaggerv1.Canary, eventType, template string, args []interface{}) {
webhookOverride := false
if len(r.GetAnalysis().Webhooks) > 0 {
for _, canaryWebhook := range r.GetAnalysis().Webhooks {
if canaryWebhook.Type == flaggerv1.EventHook {
webhookOverride = true
err := CallEventWebhook(r, canaryWebhook.URL, fmt.Sprintf(template, args...), eventType)
if err != nil {
c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf("error sending event to webhook: %s", err)
}
for _, canaryWebhook := range r.GetAnalysis().Webhooks {
if canaryWebhook.Type == flaggerv1.EventHook {
webhookOverride = true
err := CallEventWebhook(r, canaryWebhook.URL, fmt.Sprintf(template, args...), eventType)
if err != nil {
c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf("error sending event to webhook: %s", err)
}
}
}
Expand Down
31 changes: 18 additions & 13 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -333,7 +334,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh

// strategy: A/B testing
if len(cd.GetAnalysis().Match) > 0 && cd.GetAnalysis().Iterations > 0 {
c.runAB(cd, canaryController, meshRouter, provider)
c.runAB(cd, canaryController, meshRouter)
return
}

Expand All @@ -345,12 +346,13 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh

// strategy: Canary progressive traffic increase
if cd.GetAnalysis().StepWeight > 0 {
c.runCanary(cd, canaryController, meshRouter, provider, mirrored, canaryWeight, primaryWeight, maxWeight)
c.runCanary(cd, canaryController, meshRouter, mirrored, canaryWeight, primaryWeight, maxWeight)
}

}

func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) {
func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary.Controller,
meshRouter router.Interface, mirrored bool, canaryWeight int, primaryWeight int, maxWeight int) {
primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name)

// increase traffic weight
Expand Down Expand Up @@ -420,7 +422,8 @@ func (c *Controller) runCanary(canary *flaggerv1.Canary, canaryController canary
}
}

func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string) {
func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Controller,
meshRouter router.Interface) {
primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name)

// route traffic to canary and increment iterations
Expand Down Expand Up @@ -462,7 +465,8 @@ func (c *Controller) runAB(canary *flaggerv1.Canary, canaryController canary.Con
}
}

func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController canary.Controller, meshRouter router.Interface, provider string, mirrored bool) {
func (c *Controller) runBlueGreen(canary *flaggerv1.Canary, canaryController canary.Controller,
meshRouter router.Interface, provider string, mirrored bool) {
primaryName := fmt.Sprintf("%s-primary", canary.Spec.TargetRef.Name)

// increment iterations
Expand Down Expand Up @@ -820,9 +824,10 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
if metric.Name == "request-success-rate" {
val, err := observer.GetRequestSuccessRate(toMetricModel(canary, metric.Interval))
if err != nil {
if strings.Contains(err.Error(), "no values found") {
c.recordEventWarningf(canary, "Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic",
metricsProvider, metric.Name, canary.Spec.TargetRef.Name, canary.Namespace)
if errors.Is(err, observers.ErrNoValuesFound) {
c.recordEventWarningf(canary,
"Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic: %v",
metricsProvider, metric.Name, canary.Spec.TargetRef.Name, canary.Namespace, err)
} else {
c.recordEventErrorf(canary, "Prometheus query failed: %v", err)
}
Expand Down Expand Up @@ -851,7 +856,7 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
if metric.Name == "request-duration" {
val, err := observer.GetRequestDuration(toMetricModel(canary, metric.Interval))
if err != nil {
if strings.Contains(err.Error(), "no values found") {
if errors.Is(err, observers.ErrNoValuesFound) {
c.recordEventWarningf(canary, "Halt advancement no values found for %s metric %s probably %s.%s is not receiving traffic",
metricsProvider, metric.Name, canary.Spec.TargetRef.Name, canary.Namespace)
} else {
Expand Down Expand Up @@ -882,7 +887,7 @@ func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool {
if metric.Query != "" {
val, err := observerFactory.Client.RunQuery(metric.Query)
if err != nil {
if strings.Contains(err.Error(), "no values found") {
if errors.Is(err, observers.ErrNoValuesFound) {
c.recordEventWarningf(canary, "Halt advancement no values found for metric: %s",
metric.Name)
} else {
Expand Down Expand Up @@ -955,9 +960,9 @@ func (c *Controller) runMetricChecks(canary *flaggerv1.Canary) bool {

val, err := provider.RunQuery(query)
if err != nil {
if strings.Contains(err.Error(), "no values found") {
c.recordEventWarningf(canary, "Halt advancement no values found for custom metric: %s",
metric.Name)
if errors.Is(err, providers.ErrNoValuesFound) {
c.recordEventWarningf(canary, "Halt advancement no values found for custom metric: %s: %v",
metric.Name, err)
} else {
c.recordEventErrorf(canary, "Metric query failed for %s: %v", metric.Name, err)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/metrics/observers/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package observers

import "errors"

var (
ErrNoValuesFound = errors.New("no values found")
)
6 changes: 3 additions & 3 deletions pkg/metrics/providers/cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ func (p *CloudWatchProvider) RunQuery(query string) (float64, error) {

mr := res.MetricDataResults
if len(mr) < 1 {
return 0, fmt.Errorf("no values found in response: %s", res.String())
return 0, fmt.Errorf("invalid response: %s: %w", res.String(), ErrNoValuesFound)
}

vs := res.MetricDataResults[0].Values
vs := mr[0].Values
if len(vs) < 1 {
return 0, fmt.Errorf("no values found in response: %s", res.String())
return 0, fmt.Errorf("invalid reponse %s: %w", res.String(), ErrNoValuesFound)
}

return aws.Float64Value(vs[0]), nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/metrics/providers/cloudwatch_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package providers

import (
"errors"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -146,13 +146,13 @@ func TestCloudWatchProvider_RunQuery(t *testing.T) {

_, err := p.RunQuery(query)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "no values"))
require.True(t, errors.Is(err, ErrNoValuesFound))

p = CloudWatchProvider{client: cloudWatchClientMock{
o: &cloudwatch.GetMetricDataOutput{}}}

_, err = p.RunQuery(query)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "no values"))
require.True(t, errors.Is(err, ErrNoValuesFound))
})
}
26 changes: 15 additions & 11 deletions pkg/metrics/providers/datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewDatadogProvider(metricInterval string,

md, err := time.ParseDuration(metricInterval)
if err != nil {
return nil, fmt.Errorf("error parsing metric interval: %s", err.Error())
return nil, fmt.Errorf("error parsing metric interval: %w", err)
}

dd.fromDelta = int64(datadogFromDeltaMultiplierOnMetricInterval * md.Seconds())
Expand All @@ -89,7 +89,7 @@ func (p *DatadogProvider) RunQuery(query string) (float64, error) {

req, err := http.NewRequest("GET", p.metricsQueryEndpoint, nil)
if err != nil {
return 0, fmt.Errorf("error http.NewRequest: %s", err.Error())
return 0, fmt.Errorf("error http.NewRequest: %w", err)
}

req.Header.Set(datadogAPIKeyHeaderKey, p.apiKey)
Expand All @@ -111,26 +111,30 @@ func (p *DatadogProvider) RunQuery(query string) (float64, error) {
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
return 0, fmt.Errorf("error reading body: %s", err.Error())
return 0, fmt.Errorf("error reading body: %w", err)
}

if r.StatusCode != http.StatusOK {
return 0, fmt.Errorf("error response: %s", string(b))
return 0, fmt.Errorf("error response: %s: %w", string(b), err)
}

var res datadogResponse
if err := json.Unmarshal(b, &res); err != nil {
return 0, fmt.Errorf("error unmarshaling result: %s, '%s'", err.Error(), string(b))
return 0, fmt.Errorf("error unmarshaling result '%s': %w", string(b), err)
}

if len(res.Series) < 1 {
return 0, fmt.Errorf("no values found in response: %s", string(b))
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
}

s := res.Series[0]
vs := s.Pointlist[len(s.Pointlist)-1]
pl := res.Series[0].Pointlist
if len(pl) < 1 {
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
}

vs := pl[len(pl)-1]
if len(vs) < 1 {
return 0, fmt.Errorf("no values found in response: %s", string(b))
return 0, fmt.Errorf("invalid response: %s: %w", string(b), ErrNoValuesFound)
}

return vs[1], nil
Expand All @@ -141,7 +145,7 @@ func (p *DatadogProvider) RunQuery(query string) (float64, error) {
func (p *DatadogProvider) IsOnline() (bool, error) {
req, err := http.NewRequest("GET", p.apiKeyValidationEndpoint, nil)
if err != nil {
return false, fmt.Errorf("error http.NewRequest: %s", err.Error())
return false, fmt.Errorf("error http.NewRequest: %w", err)
}

req.Header.Add(datadogAPIKeyHeaderKey, p.apiKey)
Expand All @@ -157,7 +161,7 @@ func (p *DatadogProvider) IsOnline() (bool, error) {

b, err := ioutil.ReadAll(r.Body)
if err != nil {
return false, fmt.Errorf("error reading body: %s", err.Error())
return false, fmt.Errorf("error reading body: %w", err)
}

if r.StatusCode != http.StatusOK {
Expand Down
93 changes: 57 additions & 36 deletions pkg/metrics/providers/datadog_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package providers

import (
"errors"
"fmt"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -36,45 +37,65 @@ func TestNewDatadogProvider(t *testing.T) {
}

func TestDatadogProvider_RunQuery(t *testing.T) {
eq := `avg:system.cpu.user\{*}by{host}`
appKey := "app-key"
apiKey := "api-key"
expected := 1.11111

now := time.Now().Unix()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aq := r.URL.Query().Get("query")
assert.Equal(t, eq, aq)
assert.Equal(t, appKey, r.Header.Get(datadogApplicationKeyHeaderKey))
assert.Equal(t, apiKey, r.Header.Get(datadogAPIKeyHeaderKey))

from, err := strconv.ParseInt(r.URL.Query().Get("from"), 10, 64)
if assert.NoError(t, err) {
assert.Less(t, from, now)
}

to, err := strconv.ParseInt(r.URL.Query().Get("to"), 10, 64)
if assert.NoError(t, err) {
assert.GreaterOrEqual(t, to, now)
}

json := fmt.Sprintf(`{"series": [{"pointlist": [[1577232000000,29325.102158814265],[1577318400000,56294.46758591842],[1577404800000,%f]]}]}`, expected)
w.Write([]byte(json))
}))
defer ts.Close()

dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
require.NoError(t, err)
t.Run("ok", func(t *testing.T) {
expected := 1.11111
eq := `avg:system.cpu.user{*}by{host}`
now := time.Now().Unix()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
aq := r.URL.Query().Get("query")
assert.Equal(t, eq, aq)
assert.Equal(t, appKey, r.Header.Get(datadogApplicationKeyHeaderKey))
assert.Equal(t, apiKey, r.Header.Get(datadogAPIKeyHeaderKey))

from, err := strconv.ParseInt(r.URL.Query().Get("from"), 10, 64)
if assert.NoError(t, err) {
assert.Less(t, from, now)
}

f, err := dp.RunQuery(eq)
require.NoError(t, err)
assert.Equal(t, expected, f)
to, err := strconv.ParseInt(r.URL.Query().Get("to"), 10, 64)
if assert.NoError(t, err) {
assert.GreaterOrEqual(t, to, now)
}

json := fmt.Sprintf(`{"series": [{"pointlist": [[1577232000000,29325.102158814265],[1577318400000,56294.46758591842],[1577404800000,%f]]}]}`, expected)
w.Write([]byte(json))
}))
defer ts.Close()

dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
require.NoError(t, err)

f, err := dp.RunQuery(eq)
require.NoError(t, err)
assert.Equal(t, expected, f)
})

t.Run("no values", func(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json := fmt.Sprintf(`{"series": [{"pointlist": []}]}`)
w.Write([]byte(json))
}))
defer ts.Close()

dp, err := NewDatadogProvider("1m",
flaggerv1.MetricTemplateProvider{Address: ts.URL},
map[string][]byte{
datadogApplicationKeySecretKey: []byte(appKey),
datadogAPIKeySecretKey: []byte(apiKey),
},
)
require.NoError(t, err)
_, err = dp.RunQuery("")
require.True(t, errors.Is(err, ErrNoValuesFound))
})
}

func TestDatadogProvider_IsOnline(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions pkg/metrics/providers/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package providers

import "errors"

var (
ErrNoValuesFound = errors.New("no values found")
)
2 changes: 1 addition & 1 deletion pkg/metrics/providers/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (p *PrometheusProvider) RunQuery(query string) (float64, error) {
}
}
if value == nil {
return 0, fmt.Errorf("no values found")
return 0, fmt.Errorf("%w", ErrNoValuesFound)
}

return *value, nil
Expand Down
Loading

0 comments on commit 9196e87

Please sign in to comment.