Skip to content

Commit

Permalink
Metrics provider for deployments and services behind Envoy
Browse files Browse the repository at this point in the history
Assumes `envoy:smi` as the mesh provider name as I've successfully tested the progressive delivery for Envoy + Crossover with it.

This enhances Flagger to translate it to the metrics provider name of `envoy` for deployment targets, or `envoy:service` for service targets.

The `envoy` metrics provider is equivalent to `appmesh`, as both relies on the same set of standard metrics exposed by Envoy itself.

The `envoy:service` is almost the same as the `envoy` provider, but removing the condition on pod name, as we only need to filter on the backing service name = envoy_cluster_name. We don't consider other Envoy xDS implementations that uses anything that is different to original servicen ames as `envoy_cluster_name`, for now.

Ref #385
  • Loading branch information
mumoshu committed Nov 30, 2019
1 parent b02a6da commit 6661406
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 12 deletions.
2 changes: 1 addition & 1 deletion cmd/flagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func main() {
logger.Infof("Watching namespace %s", namespace)
}

observerFactory, err := metrics.NewFactory(metricsServer, meshProvider, 5*time.Second)
observerFactory, err := metrics.NewFactory(metricsServer, 5*time.Second)
if err != nil {
logger.Fatalf("Error building prometheus client: %s", err.Error())
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func SetupMocks(c *flaggerv1.Canary) Mocks {
rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", logger, flaggerClient)

// init observer
observerFactory, _ := metrics.NewFactory("fake", "istio", 5*time.Second)
observerFactory, _ := metrics.NewFactory("fake", 5*time.Second)

// init canary factory
configTracker := canary.ConfigTracker{
Expand Down
17 changes: 15 additions & 2 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ import (
"github.com/weaveworks/flagger/pkg/router"
)

const (
MetricsProviderServiceSuffix = ":service"
)

// scheduleCanaries synchronises the canary map with the jobs map,
// for new canaries new jobs are created and started
// for the removed canaries the jobs are stopped and deleted
Expand Down Expand Up @@ -747,10 +751,19 @@ func (c *Controller) analyseCanary(r *flaggerv1.Canary) bool {
if r.Spec.Provider != "" {
metricsProvider = r.Spec.Provider

// set the metrics server to Linkerd Prometheus when Linkerd is the default mesh provider
// set the metrics provider to Linkerd Prometheus when Linkerd is the default mesh provider
if strings.Contains(c.meshProvider, "linkerd") {
metricsProvider = "linkerd"
}

// set the metrics provider to Envoy Prometheus when Envoy is the default mesh provider
if strings.Contains(c.meshProvider, "envoy") {
metricsProvider = "envoy"
}
}
// set the metrics provider to query Prometheus for the canary Kubernetes service if the canary target is Service
if r.Spec.TargetRef.Kind == "Service" {
metricsProvider = metricsProvider + MetricsProviderServiceSuffix
}

// create observer based on the mesh provider
Expand All @@ -761,7 +774,7 @@ func (c *Controller) analyseCanary(r *flaggerv1.Canary) bool {
if r.Spec.MetricsServer != "" {
metricsServer = r.Spec.MetricsServer
var err error
observerFactory, err = metrics.NewFactory(metricsServer, metricsProvider, 5*time.Second)
observerFactory, err = metrics.NewFactory(metricsServer, 5*time.Second)
if err != nil {
c.recordEventErrorf(r, "Error building Prometheus client for %s %v", r.Spec.MetricsServer, err)
return false
Expand Down
73 changes: 73 additions & 0 deletions pkg/metrics/envoy_service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package metrics

import (
"time"
)

var envoyServiceQueries = map[string]string{
"request-success-rate": `
sum(
rate(
envoy_cluster_upstream_rq{
kubernetes_namespace="{{ .Namespace }}",
envoy_cluster_name="{{ .Name }}-canary",
envoy_response_code!~"5.*"
}[{{ .Interval }}]
)
)
/
sum(
rate(
envoy_cluster_upstream_rq{
kubernetes_namespace="{{ .Namespace }}",
envoy_cluster_name="{{ .Name }}-canary"
}[{{ .Interval }}]
)
)
* 100`,
"request-duration": `
histogram_quantile(
0.99,
sum(
rate(
envoy_cluster_upstream_rq_time_bucket{
kubernetes_namespace="{{ .Namespace }}",
envoy_cluster_name="{{ .Name }}-canary"
}[{{ .Interval }}]
)
) by (le)
)`,
}

type EnvoyServiceObserver struct {
client *PrometheusClient
}

func (ob *EnvoyServiceObserver) GetRequestSuccessRate(name string, namespace string, interval string) (float64, error) {
query, err := ob.client.RenderQuery(name, namespace, interval, envoyServiceQueries["request-success-rate"])
if err != nil {
return 0, err
}

value, err := ob.client.RunQuery(query)
if err != nil {
return 0, err
}

return value, nil
}

func (ob *EnvoyServiceObserver) GetRequestDuration(name string, namespace string, interval string) (time.Duration, error) {
query, err := ob.client.RenderQuery(name, namespace, interval, envoyServiceQueries["request-duration"])
if err != nil {
return 0, err
}

value, err := ob.client.RunQuery(query)
if err != nil {
return 0, err
}

ms := time.Duration(int64(value)) * time.Millisecond
return ms, nil
}
74 changes: 74 additions & 0 deletions pkg/metrics/envoy_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package metrics

import (
"net/http"
"net/http/httptest"
"testing"
"time"
)

func TestEnvoyServiceObserver_GetRequestSuccessRate(t *testing.T) {
expected := ` sum( rate( envoy_cluster_upstream_rq{ kubernetes_namespace="default", envoy_cluster_name="podinfo-canary", envoy_response_code!~"5.*" }[1m] ) ) / sum( rate( envoy_cluster_upstream_rq{ kubernetes_namespace="default", envoy_cluster_name="podinfo-canary" }[1m] ) ) * 100`

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
promql := r.URL.Query()["query"][0]
if promql != expected {
t.Errorf("\nGot %s \nWanted %s", promql, expected)
}

json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}`
w.Write([]byte(json))
}))
defer ts.Close()

client, err := NewPrometheusClient(ts.URL, time.Second)
if err != nil {
t.Fatal(err)
}

observer := &EnvoyServiceObserver{
client: client,
}

val, err := observer.GetRequestSuccessRate("podinfo", "default", "1m")
if err != nil {
t.Fatal(err.Error())
}

if val != 100 {
t.Errorf("Got %v wanted %v", val, 100)
}
}

func TestEnvoyServiceObserver_GetRequestDuration(t *testing.T) {
expected := ` histogram_quantile( 0.99, sum( rate( envoy_cluster_upstream_rq_time_bucket{ kubernetes_namespace="default", envoy_cluster_name="podinfo-canary" }[1m] ) ) by (le) )`

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
promql := r.URL.Query()["query"][0]
if promql != expected {
t.Errorf("\nGot %s \nWanted %s", promql, expected)
}

json := `{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1,"100"]}]}}`
w.Write([]byte(json))
}))
defer ts.Close()

client, err := NewPrometheusClient(ts.URL, time.Second)
if err != nil {
t.Fatal(err)
}

observer := &EnvoyServiceObserver{
client: client,
}

val, err := observer.GetRequestDuration("podinfo", "default", "1m")
if err != nil {
t.Fatal(err.Error())
}

if val != 100*time.Millisecond {
t.Errorf("Got %v wanted %v", val, 100*time.Millisecond)
}
}
14 changes: 6 additions & 8 deletions pkg/metrics/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,17 @@ import (
)

type Factory struct {
MeshProvider string
Client *PrometheusClient
Client *PrometheusClient
}

func NewFactory(metricsServer string, meshProvider string, timeout time.Duration) (*Factory, error) {
func NewFactory(metricsServer string, timeout time.Duration) (*Factory, error) {
client, err := NewPrometheusClient(metricsServer, timeout)
if err != nil {
return nil, err
}

return &Factory{
MeshProvider: meshProvider,
Client: client,
Client: client,
}, nil
}

Expand All @@ -32,7 +30,7 @@ func (factory Factory) Observer(provider string) Interface {
return &HttpObserver{
client: factory.Client,
}
case provider == "appmesh":
case provider == "appmesh", provider == "envoy":
return &EnvoyObserver{
client: factory.Client,
}
Expand All @@ -44,8 +42,8 @@ func (factory Factory) Observer(provider string) Interface {
return &GlooObserver{
client: factory.Client,
}
case provider == "smi:linkerd":
return &LinkerdObserver{
case provider == "appmesh:service", provider == "envoy:service":
return &EnvoyServiceObserver{
client: factory.Client,
}
case provider == "linkerd":
Expand Down

0 comments on commit 6661406

Please sign in to comment.