From 5d84596bc0e7e68f9e45d1fec218b97693e7c017 Mon Sep 17 00:00:00 2001 From: mathetake Date: Fri, 15 May 2020 19:35:40 +0900 Subject: [PATCH 1/4] pkg/controller: check metrics server's availability during initalization --- pkg/controller/scheduler.go | 12 ++-- pkg/controller/scheduler_common_test.go | 33 ---------- .../scheduler_daemonset_fixture_test.go | 4 +- .../scheduler_deployment_fixture_test.go | 4 +- pkg/controller/scheduler_deployment_test.go | 2 - pkg/controller/scheduler_metrics.go | 61 +++++++++++++++++++ pkg/controller/scheduler_test.go | 53 ++++++++++++++++ 7 files changed, 126 insertions(+), 43 deletions(-) delete mode 100644 pkg/controller/scheduler_common_test.go create mode 100644 pkg/controller/scheduler_test.go diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 69aa9ae16..565466aba 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -14,10 +14,6 @@ import ( "github.com/weaveworks/flagger/pkg/router" ) -const ( - MetricsProviderServiceSuffix = ":service" -) - // scheduleCanaries synchronises the canary map with the jobs map, // for new canaries new jobs are created and started // for the removed canaries the jobs are stopped and deleted @@ -119,6 +115,14 @@ func (c *Controller) advanceCanary(name string, namespace string) { return } + // check metric servers' availability + if !cd.SkipAnalysis() && (cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing) { + if err := c.checkMetricProviderAvailability(cd); err != nil { + c.recordEventErrorf(cd, "Error checking metric providers: %v", err) + return + } + } + // init mesh router meshRouter := c.routerFactory.MeshRouter(provider, labelSelector) diff --git a/pkg/controller/scheduler_common_test.go b/pkg/controller/scheduler_common_test.go deleted file mode 100644 index 970fcec96..000000000 --- a/pkg/controller/scheduler_common_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package controller - -import ( - "context" - "fmt" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" - clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" -) - -func assertPhase(flaggerClient clientset.Interface, canary string, phase flaggerv1.CanaryPhase) error { - c, err := flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), canary, metav1.GetOptions{}) - if err != nil { - return err - } - - if c.Status.Phase != phase { - return fmt.Errorf("Got canary state %s wanted %s", c.Status.Phase, phase) - } - - return nil -} - -func alwaysReady() bool { - return true -} - -func toFloatPtr(val int) *float64 { - v := float64(val) - return &v -} diff --git a/pkg/controller/scheduler_daemonset_fixture_test.go b/pkg/controller/scheduler_daemonset_fixture_test.go index ac620814f..3c85484bb 100644 --- a/pkg/controller/scheduler_daemonset_fixture_test.go +++ b/pkg/controller/scheduler_daemonset_fixture_test.go @@ -79,7 +79,7 @@ func newDaemonSetFixture(c *flaggerv1.Canary) daemonSetFixture { rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", "", logger, flaggerClient) // init observer - observerFactory, _ := observers.NewFactory("fake") + observerFactory, _ := observers.NewFactory(testMetricsServerURL) // init canary factory configTracker := &canary.ConfigTracker{ @@ -616,7 +616,7 @@ func newDaemonSetTestService() *corev1.Service { func newDaemonSetTestMetricTemplate() *flaggerv1.MetricTemplate { provider := flaggerv1.MetricTemplateProvider{ Type: "prometheus", - Address: "fake", + Address: testMetricsServerURL, SecretRef: &corev1.LocalObjectReference{ Name: "podinfo-secret-env", }, diff --git a/pkg/controller/scheduler_deployment_fixture_test.go b/pkg/controller/scheduler_deployment_fixture_test.go index d1bfb3ba2..e0a957585 100644 --- a/pkg/controller/scheduler_deployment_fixture_test.go +++ b/pkg/controller/scheduler_deployment_fixture_test.go @@ -107,7 +107,7 @@ func newDeploymentFixture(c *flaggerv1.Canary) fixture { rf := router.NewFactory(nil, kubeClient, flaggerClient, "annotationsPrefix", "", logger, flaggerClient) // init observer - observerFactory, _ := observers.NewFactory("fake") + observerFactory, _ := observers.NewFactory(testMetricsServerURL) // init canary factory configTracker := &canary.ConfigTracker{ @@ -708,7 +708,7 @@ func newDeploymentTestHPA() *hpav2.HorizontalPodAutoscaler { func newDeploymentTestMetricTemplate() *flaggerv1.MetricTemplate { provider := flaggerv1.MetricTemplateProvider{ Type: "prometheus", - Address: "fake", + Address: testMetricsServerURL, SecretRef: &corev1.LocalObjectReference{ Name: "podinfo-secret-env", }, diff --git a/pkg/controller/scheduler_deployment_test.go b/pkg/controller/scheduler_deployment_test.go index f9f04dd60..44ac59d7a 100644 --- a/pkg/controller/scheduler_deployment_test.go +++ b/pkg/controller/scheduler_deployment_test.go @@ -84,11 +84,9 @@ func TestScheduler_DeploymentRollback(t *testing.T) { // run metric checks mocks.ctrl.advanceCanary("podinfo", "default") - require.NoError(t, err) // finalise analysis mocks.ctrl.advanceCanary("podinfo", "default") - require.NoError(t, err) // check status c, err = mocks.flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), "podinfo", metav1.GetOptions{}) diff --git a/pkg/controller/scheduler_metrics.go b/pkg/controller/scheduler_metrics.go index 0c370ce6a..2f30bc98f 100644 --- a/pkg/controller/scheduler_metrics.go +++ b/pkg/controller/scheduler_metrics.go @@ -3,6 +3,7 @@ package controller import ( "context" "errors" + "fmt" "strings" "time" @@ -13,6 +14,66 @@ import ( "github.com/weaveworks/flagger/pkg/metrics/providers" ) +const ( + MetricsProviderServiceSuffix = ":service" +) + +// to be called during canary initialization +func (c *Controller) checkMetricProviderAvailability(canary *flaggerv1.Canary) error { + for _, metric := range canary.GetAnalysis().Metrics { + if metric.Name == "request-success-rate" || metric.Name == "request-duration" { + observerFactory := c.observerFactory + if canary.Spec.MetricsServer != "" { + var err error + observerFactory, err = observers.NewFactory(canary.Spec.MetricsServer) + if err != nil { + return fmt.Errorf("error building Prometheus client for %s %v", canary.Spec.MetricsServer, err) + } + } + if ok, err := observerFactory.Client.IsOnline(); !ok || err != nil { + return fmt.Errorf("prometheus not avaiable: %v", err) + } + continue + } + + if metric.TemplateRef != nil { + namespace := canary.Namespace + if metric.TemplateRef.Namespace != "" { + namespace = metric.TemplateRef.Namespace + } + + template, err := c.flaggerInformers.MetricInformer.Lister().MetricTemplates(namespace).Get(metric.TemplateRef.Name) + if err != nil { + return fmt.Errorf("metric template %s.%s error: %v", metric.TemplateRef.Name, namespace, err) + } + + var credentials map[string][]byte + if template.Spec.Provider.SecretRef != nil { + secret, err := c.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), template.Spec.Provider.SecretRef.Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("metric template %s.%s secret %s error: %v", + metric.TemplateRef.Name, namespace, template.Spec.Provider.SecretRef.Name, err) + } + credentials = secret.Data + } + + factory := providers.Factory{} + provider, err := factory.Provider(metric.Interval, template.Spec.Provider, credentials) + if err != nil { + return fmt.Errorf("metric template %s.%s provider %s error: %v", + metric.TemplateRef.Name, namespace, template.Spec.Provider.Type, err) + } + + if ok, err := provider.IsOnline(); !ok || err != nil { + return fmt.Errorf("%v in metric tempalte %s.%s not avaiable: %v", template.Spec.Provider.Type, + template.Name, template.Namespace, err) + } + } + } + c.recordEventInfof(canary, "all the metrics providers are available!") + return nil +} + func (c *Controller) runBuiltinMetricChecks(canary *flaggerv1.Canary) bool { // override the global provider if one is specified in the canary spec var metricsProvider string diff --git a/pkg/controller/scheduler_test.go b/pkg/controller/scheduler_test.go new file mode 100644 index 000000000..bf5a517c1 --- /dev/null +++ b/pkg/controller/scheduler_test.go @@ -0,0 +1,53 @@ +package controller + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" + clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var testMetricsServerURL string + +func TestMain(m *testing.M) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Query()["query"][0] == "vector(1)" { + // for IsOnline invoked during canary initialization + w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"1"]}]}}`)) + return + } + w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{},"value":[1545905245.458,"100"]}]}}`)) + })) + + testMetricsServerURL = ts.URL + defer ts.Close() + os.Exit(m.Run()) +} + +func assertPhase(flaggerClient clientset.Interface, canary string, phase flaggerv1.CanaryPhase) error { + c, err := flaggerClient.FlaggerV1beta1().Canaries("default").Get(context.TODO(), canary, metav1.GetOptions{}) + if err != nil { + return err + } + + if c.Status.Phase != phase { + return fmt.Errorf("got canary state %s wanted %s", c.Status.Phase, phase) + } + + return nil +} + +func alwaysReady() bool { + return true +} + +func toFloatPtr(val int) *float64 { + v := float64(val) + return &v +} From ad73643e4a638fdc8b494bc1c0ed39c4d4b66660 Mon Sep 17 00:00:00 2001 From: mathetake Date: Fri, 15 May 2020 19:36:11 +0900 Subject: [PATCH 2/4] pkg/metrics/provideres: delete fake value --- pkg/metrics/providers/prometheus.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/metrics/providers/prometheus.go b/pkg/metrics/providers/prometheus.go index f99131c7d..64f60d408 100644 --- a/pkg/metrics/providers/prometheus.go +++ b/pkg/metrics/providers/prometheus.go @@ -69,10 +69,6 @@ func NewPrometheusProvider(provider flaggerv1.MetricTemplateProvider, credential // RunQuery executes the promQL query and returns the the first result as float64 func (p *PrometheusProvider) RunQuery(query string) (float64, error) { - if p.url.String() == "fake" { - return 100, nil - } - query = url.QueryEscape(p.trimQuery(query)) u, err := url.Parse(fmt.Sprintf("./api/v1/query?query=%s", query)) if err != nil { From a17e8b4794871a07a6448c7e69efa07c51ea89de Mon Sep 17 00:00:00 2001 From: mathetake Date: Fri, 15 May 2020 21:44:35 +0900 Subject: [PATCH 3/4] not return even if checkMetricProviderAvailability fails --- pkg/controller/scheduler.go | 1 - pkg/controller/scheduler_test.go | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 565466aba..e636c07db 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -119,7 +119,6 @@ func (c *Controller) advanceCanary(name string, namespace string) { if !cd.SkipAnalysis() && (cd.Status.Phase == "" || cd.Status.Phase == flaggerv1.CanaryPhaseInitializing) { if err := c.checkMetricProviderAvailability(cd); err != nil { c.recordEventErrorf(cd, "Error checking metric providers: %v", err) - return } } diff --git a/pkg/controller/scheduler_test.go b/pkg/controller/scheduler_test.go index bf5a517c1..1ad82dd80 100644 --- a/pkg/controller/scheduler_test.go +++ b/pkg/controller/scheduler_test.go @@ -10,6 +10,7 @@ import ( flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) From e0de9d0afa756bf307fccbe6d404e3aadcbbcc9f Mon Sep 17 00:00:00 2001 From: mathetake Date: Sat, 16 May 2020 11:12:45 +0900 Subject: [PATCH 4/4] pkg/controller: add unit test for checkMetricProviderAvailability --- pkg/controller/scheduler_metrics_test.go | 53 ++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 pkg/controller/scheduler_metrics_test.go diff --git a/pkg/controller/scheduler_metrics_test.go b/pkg/controller/scheduler_metrics_test.go new file mode 100644 index 000000000..36117ac4c --- /dev/null +++ b/pkg/controller/scheduler_metrics_test.go @@ -0,0 +1,53 @@ +package controller + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "k8s.io/client-go/tools/record" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1beta1" + "github.com/weaveworks/flagger/pkg/metrics/observers" +) + +func TestController_checkMetricProviderAvailability(t *testing.T) { + t.Run("builtin", func(t *testing.T) { + // ok + analysis := &flaggerv1.CanaryAnalysis{Metrics: []flaggerv1.CanaryMetric{{Name: "request-success-rate"}}} + canary := &flaggerv1.Canary{Spec: flaggerv1.CanarySpec{Analysis: analysis}} + obs, err := observers.NewFactory(testMetricsServerURL) + require.NoError(t, err) + ctrl := Controller{observerFactory: obs, logger: zap.S(), eventRecorder: &record.FakeRecorder{}} + require.NoError(t, ctrl.checkMetricProviderAvailability(canary)) + + // error + ctrl.observerFactory, err = observers.NewFactory("http://non-exist") + require.NoError(t, err) + require.Error(t, ctrl.checkMetricProviderAvailability(canary)) + + // ok + canary.Spec.MetricsServer = testMetricsServerURL + require.NoError(t, ctrl.checkMetricProviderAvailability(canary)) + }) + + t.Run("templateRef", func(t *testing.T) { + ctrl := newDeploymentFixture(nil).ctrl + + // error (not found) + analysis := &flaggerv1.CanaryAnalysis{Metrics: []flaggerv1.CanaryMetric{{ + Name: "", TemplateRef: &flaggerv1.CrossNamespaceObjectReference{ + Name: "non-exist", Namespace: "default", + }, + }}} + canary := &flaggerv1.Canary{Spec: flaggerv1.CanarySpec{Analysis: analysis}} + require.Error(t, ctrl.checkMetricProviderAvailability(canary)) + + // ok + canary.Spec.Analysis.Metrics[0].TemplateRef = &flaggerv1.CrossNamespaceObjectReference{ + Name: "envoy", + Namespace: "default", + } + require.NoError(t, ctrl.checkMetricProviderAvailability(canary)) + }) +}