From a09dc2cbd8ace3756c262cfaa2f66674e791e2bf Mon Sep 17 00:00:00 2001 From: stefanprodan Date: Mon, 15 Apr 2019 11:25:45 +0300 Subject: [PATCH 1/4] Rename logging package --- cmd/flagger/main.go | 4 ++-- cmd/loadtester/main.go | 4 ++-- pkg/loadtester/runner_test.go | 4 ++-- pkg/loadtester/task_ngrinder_test.go | 4 ++-- pkg/{logging => logger}/logger.go | 14 +------------- pkg/router/router_test.go | 4 ++-- 6 files changed, 11 insertions(+), 23 deletions(-) rename pkg/{logging => logger}/logger.go (88%) diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index ed2fa6598..e0d702f46 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -6,7 +6,7 @@ import ( clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" informers "github.com/weaveworks/flagger/pkg/client/informers/externalversions" "github.com/weaveworks/flagger/pkg/controller" - "github.com/weaveworks/flagger/pkg/logging" + "github.com/weaveworks/flagger/pkg/logger" "github.com/weaveworks/flagger/pkg/metrics" "github.com/weaveworks/flagger/pkg/notifier" "github.com/weaveworks/flagger/pkg/server" @@ -58,7 +58,7 @@ func init() { func main() { flag.Parse() - logger, err := logging.NewLoggerWithEncoding(logLevel, zapEncoding) + logger, err := logger.NewLoggerWithEncoding(logLevel, zapEncoding) if err != nil { log.Fatalf("Error creating logger: %v", err) } diff --git a/cmd/loadtester/main.go b/cmd/loadtester/main.go index dfdafdbce..a3e98d3ac 100644 --- a/cmd/loadtester/main.go +++ b/cmd/loadtester/main.go @@ -3,7 +3,7 @@ package main import ( "flag" "github.com/weaveworks/flagger/pkg/loadtester" - "github.com/weaveworks/flagger/pkg/logging" + "github.com/weaveworks/flagger/pkg/logger" "github.com/weaveworks/flagger/pkg/signals" "go.uber.org/zap" "log" @@ -30,7 +30,7 @@ func init() { func main() { flag.Parse() - logger, err := logging.NewLoggerWithEncoding(logLevel, zapEncoding) + logger, err := logger.NewLoggerWithEncoding(logLevel, zapEncoding) if err != nil { log.Fatalf("Error creating logger: %v", err) } diff --git a/pkg/loadtester/runner_test.go b/pkg/loadtester/runner_test.go index 1b43509bd..1c7ab9c7f 100644 --- a/pkg/loadtester/runner_test.go +++ b/pkg/loadtester/runner_test.go @@ -1,14 +1,14 @@ package loadtester import ( - "github.com/weaveworks/flagger/pkg/logging" + "github.com/weaveworks/flagger/pkg/logger" "testing" "time" ) func TestTaskRunner_Start(t *testing.T) { stop := make(chan struct{}) - logger, _ := logging.NewLogger("debug") + logger, _ := logger.NewLogger("debug") tr := NewTaskRunner(logger, time.Hour) go tr.Start(10*time.Millisecond, stop) diff --git a/pkg/loadtester/task_ngrinder_test.go b/pkg/loadtester/task_ngrinder_test.go index 3402ba881..699aea72d 100644 --- a/pkg/loadtester/task_ngrinder_test.go +++ b/pkg/loadtester/task_ngrinder_test.go @@ -3,7 +3,7 @@ package loadtester import ( "context" "fmt" - "github.com/weaveworks/flagger/pkg/logging" + "github.com/weaveworks/flagger/pkg/logger" "gopkg.in/h2non/gock.v1" "testing" "time" @@ -12,7 +12,7 @@ import ( func TestTaskNGrinder(t *testing.T) { server := "http://ngrinder:8080" cloneId := "960" - logger, _ := logging.NewLoggerWithEncoding("debug", "console") + logger, _ := logger.NewLoggerWithEncoding("debug", "console") canary := "podinfo.default" taskFactory, ok := GetTaskFactory(TaskTypeNGrinder) if !ok { diff --git a/pkg/logging/logger.go b/pkg/logger/logger.go similarity index 88% rename from pkg/logging/logger.go rename to pkg/logger/logger.go index 8c1838e56..87ba8d761 100644 --- a/pkg/logging/logger.go +++ b/pkg/logger/logger.go @@ -1,9 +1,6 @@ -package logging +package logger import ( - "fmt" - "os" - "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -64,12 +61,3 @@ func NewLoggerWithEncoding(logLevel, zapEncoding string) (*zap.SugaredLogger, er } return logger.Sugar(), nil } - -// Console writes to stdout if the console env var exists -func Console(a ...interface{}) (n int, err error) { - if os.Getenv("console") != "" { - return fmt.Fprintln(os.Stdout, a...) - } - - return 0, nil -} diff --git a/pkg/router/router_test.go b/pkg/router/router_test.go index 08f77cf93..32c5f770b 100644 --- a/pkg/router/router_test.go +++ b/pkg/router/router_test.go @@ -6,7 +6,7 @@ import ( istiov1alpha3 "github.com/weaveworks/flagger/pkg/apis/istio/v1alpha3" clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" fakeFlagger "github.com/weaveworks/flagger/pkg/client/clientset/versioned/fake" - "github.com/weaveworks/flagger/pkg/logging" + "github.com/weaveworks/flagger/pkg/logger" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" hpav1 "k8s.io/api/autoscaling/v1" @@ -35,7 +35,7 @@ func setupfakeClients() fakeClients { kubeClient := fake.NewSimpleClientset(newMockDeployment(), newMockABTestDeployment()) meshClient := fakeFlagger.NewSimpleClientset() - logger, _ := logging.NewLogger("debug") + logger, _ := logger.NewLogger("debug") return fakeClients{ canary: canary, From 60f51ad7d50a0725f2eb02307b52be6497b85487 Mon Sep 17 00:00:00 2001 From: stefanprodan Date: Mon, 15 Apr 2019 11:27:08 +0300 Subject: [PATCH 2/4] Move deployer and config tracker to canary package --- pkg/canary/deployer.go | 338 ++++++++++++ pkg/{controller => canary}/deployer_test.go | 37 +- pkg/canary/mock.go | 470 ++++++++++++++++ pkg/canary/ready.go | 112 ++++ pkg/canary/status.go | 104 ++++ pkg/{controller => canary}/tracker.go | 46 +- pkg/controller/controller.go | 19 +- pkg/controller/controller_test.go | 31 +- pkg/controller/deployer.go | 566 -------------------- pkg/controller/scheduler.go | 30 +- 10 files changed, 1117 insertions(+), 636 deletions(-) create mode 100644 pkg/canary/deployer.go rename pkg/{controller => canary}/deployer_test.go (91%) create mode 100644 pkg/canary/mock.go create mode 100644 pkg/canary/ready.go create mode 100644 pkg/canary/status.go rename pkg/{controller => canary}/tracker.go (88%) delete mode 100644 pkg/controller/deployer.go diff --git a/pkg/canary/deployer.go b/pkg/canary/deployer.go new file mode 100644 index 000000000..937dff30a --- /dev/null +++ b/pkg/canary/deployer.go @@ -0,0 +1,338 @@ +package canary + +import ( + "crypto/rand" + "encoding/base64" + "encoding/json" + "fmt" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" + clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" + "go.uber.org/zap" + "io" + appsv1 "k8s.io/api/apps/v1" + hpav1 "k8s.io/api/autoscaling/v2beta1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" +) + +// Deployer is managing the operations for Kubernetes deployment kind +type Deployer struct { + KubeClient kubernetes.Interface + FlaggerClient clientset.Interface + Logger *zap.SugaredLogger + ConfigTracker ConfigTracker +} + +// Initialize creates the primary deployment and hpa +// and scales to zero the canary deployment +func (c *Deployer) Initialize(cd *flaggerv1.Canary) error { + primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) + if err := c.createPrimaryDeployment(cd); err != nil { + return fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err) + } + + if cd.Status.Phase == "" { + c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) + if err := c.Scale(cd, 0); err != nil { + return err + } + } + + if cd.Spec.AutoscalerRef != nil && cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" { + if err := c.createPrimaryHpa(cd); err != nil { + return fmt.Errorf("creating hpa %s.%s failed: %v", primaryName, cd.Namespace, err) + } + } + return nil +} + +// Promote copies the pod spec, secrets and config maps from canary to primary +func (c *Deployer) Promote(cd *flaggerv1.Canary) error { + targetName := cd.Spec.TargetRef.Name + primaryName := fmt.Sprintf("%s-primary", targetName) + + canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) + } + return fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) + } + + primary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return fmt.Errorf("deployment %s.%s not found", primaryName, cd.Namespace) + } + return fmt.Errorf("deployment %s.%s query error %v", primaryName, cd.Namespace, err) + } + + // promote secrets and config maps + configRefs, err := c.ConfigTracker.GetTargetConfigs(cd) + if err != nil { + return err + } + if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil { + return err + } + + primaryCopy := primary.DeepCopy() + primaryCopy.Spec.ProgressDeadlineSeconds = canary.Spec.ProgressDeadlineSeconds + primaryCopy.Spec.MinReadySeconds = canary.Spec.MinReadySeconds + primaryCopy.Spec.RevisionHistoryLimit = canary.Spec.RevisionHistoryLimit + primaryCopy.Spec.Strategy = canary.Spec.Strategy + + // update spec with primary secrets and config maps + primaryCopy.Spec.Template.Spec = c.ConfigTracker.ApplyPrimaryConfigs(canary.Spec.Template.Spec, configRefs) + + // update pod annotations to ensure a rolling update + annotations, err := c.makeAnnotations(canary.Spec.Template.Annotations) + if err != nil { + return err + } + primaryCopy.Spec.Template.Annotations = annotations + + primaryCopy.Spec.Template.Labels = makePrimaryLabels(canary.Spec.Template.Labels, primaryName) + + _, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Update(primaryCopy) + if err != nil { + return fmt.Errorf("updating deployment %s.%s template spec failed: %v", + primaryCopy.GetName(), primaryCopy.Namespace, err) + } + + return nil +} + +// HasDeploymentChanged returns true if the canary deployment pod spec has changed +func (c *Deployer) HasDeploymentChanged(cd *flaggerv1.Canary) (bool, error) { + targetName := cd.Spec.TargetRef.Name + canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return false, fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) + } + return false, fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) + } + + if cd.Status.LastAppliedSpec == "" { + return true, nil + } + + newSpec := &canary.Spec.Template.Spec + oldSpecJson, err := base64.StdEncoding.DecodeString(cd.Status.LastAppliedSpec) + if err != nil { + return false, fmt.Errorf("%s.%s decode error %v", cd.Name, cd.Namespace, err) + } + oldSpec := &corev1.PodSpec{} + err = json.Unmarshal(oldSpecJson, oldSpec) + if err != nil { + return false, fmt.Errorf("%s.%s unmarshal error %v", cd.Name, cd.Namespace, err) + } + + if diff := cmp.Diff(*newSpec, *oldSpec, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { + //fmt.Println(diff) + return true, nil + } + + return false, nil +} + +// Scale sets the canary deployment replicas +func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error { + targetName := cd.Spec.TargetRef.Name + dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) + } + return fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) + } + + depCopy := dep.DeepCopy() + depCopy.Spec.Replicas = int32p(replicas) + + _, err = c.KubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy) + if err != nil { + return fmt.Errorf("scaling %s.%s to %v failed: %v", depCopy.GetName(), depCopy.Namespace, replicas, err) + } + return nil +} + +func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) error { + targetName := cd.Spec.TargetRef.Name + primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) + + canaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace) + } + return err + } + + if appSel, ok := canaryDep.Spec.Selector.MatchLabels["app"]; !ok || appSel != canaryDep.Name { + return fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'", + targetName, cd.Namespace, targetName) + } + + primaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{}) + if errors.IsNotFound(err) { + // create primary secrets and config maps + configRefs, err := c.ConfigTracker.GetTargetConfigs(cd) + if err != nil { + return err + } + if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil { + return err + } + annotations, err := c.makeAnnotations(canaryDep.Spec.Template.Annotations) + if err != nil { + return err + } + + replicas := int32(1) + if canaryDep.Spec.Replicas != nil && *canaryDep.Spec.Replicas > 0 { + replicas = *canaryDep.Spec.Replicas + } + + // create primary deployment + primaryDep = &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: primaryName, + Labels: canaryDep.Labels, + Namespace: cd.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(cd, schema.GroupVersionKind{ + Group: flaggerv1.SchemeGroupVersion.Group, + Version: flaggerv1.SchemeGroupVersion.Version, + Kind: flaggerv1.CanaryKind, + }), + }, + }, + Spec: appsv1.DeploymentSpec{ + ProgressDeadlineSeconds: canaryDep.Spec.ProgressDeadlineSeconds, + MinReadySeconds: canaryDep.Spec.MinReadySeconds, + RevisionHistoryLimit: canaryDep.Spec.RevisionHistoryLimit, + Replicas: int32p(replicas), + Strategy: canaryDep.Spec.Strategy, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": primaryName, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: makePrimaryLabels(canaryDep.Spec.Template.Labels, primaryName), + Annotations: annotations, + }, + // update spec with the primary secrets and config maps + Spec: c.ConfigTracker.ApplyPrimaryConfigs(canaryDep.Spec.Template.Spec, configRefs), + }, + }, + } + + _, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep) + if err != nil { + return err + } + + c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace) + } + + return nil +} + +func (c *Deployer) createPrimaryHpa(cd *flaggerv1.Canary) error { + primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) + hpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(cd.Spec.AutoscalerRef.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return fmt.Errorf("HorizontalPodAutoscaler %s.%s not found, retrying", + cd.Spec.AutoscalerRef.Name, cd.Namespace) + } + return err + } + primaryHpaName := fmt.Sprintf("%s-primary", cd.Spec.AutoscalerRef.Name) + primaryHpa, err := c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(primaryHpaName, metav1.GetOptions{}) + + if errors.IsNotFound(err) { + primaryHpa = &hpav1.HorizontalPodAutoscaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: primaryHpaName, + Namespace: cd.Namespace, + Labels: hpa.Labels, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(cd, schema.GroupVersionKind{ + Group: flaggerv1.SchemeGroupVersion.Group, + Version: flaggerv1.SchemeGroupVersion.Version, + Kind: flaggerv1.CanaryKind, + }), + }, + }, + Spec: hpav1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: hpav1.CrossVersionObjectReference{ + Name: primaryName, + Kind: hpa.Spec.ScaleTargetRef.Kind, + APIVersion: hpa.Spec.ScaleTargetRef.APIVersion, + }, + MinReplicas: hpa.Spec.MinReplicas, + MaxReplicas: hpa.Spec.MaxReplicas, + Metrics: hpa.Spec.Metrics, + }, + } + + _, err = c.KubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Create(primaryHpa) + if err != nil { + return err + } + c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s created", primaryHpa.GetName(), cd.Namespace) + } + + return nil +} + +// makeAnnotations appends an unique ID to annotations map +func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]string, error) { + idKey := "flagger-id" + res := make(map[string]string) + uuid := make([]byte, 16) + n, err := io.ReadFull(rand.Reader, uuid) + if n != len(uuid) || err != nil { + return res, err + } + uuid[8] = uuid[8]&^0xc0 | 0x80 + uuid[6] = uuid[6]&^0xf0 | 0x40 + id := fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]) + + for k, v := range annotations { + if k != idKey { + res[k] = v + } + } + res[idKey] = id + + return res, nil +} + +func makePrimaryLabels(labels map[string]string, primaryName string) map[string]string { + idKey := "app" + res := make(map[string]string) + for k, v := range labels { + if k != idKey { + res[k] = v + } + } + res[idKey] = primaryName + + return res +} + +func int32p(i int32) *int32 { + return &i +} diff --git a/pkg/controller/deployer_test.go b/pkg/canary/deployer_test.go similarity index 91% rename from pkg/controller/deployer_test.go rename to pkg/canary/deployer_test.go index 099e01667..fc53558af 100644 --- a/pkg/controller/deployer_test.go +++ b/pkg/canary/deployer_test.go @@ -1,4 +1,4 @@ -package controller +package canary import ( "testing" @@ -8,8 +8,8 @@ import ( ) func TestCanaryDeployer_Sync(t *testing.T) { - mocks := SetupMocks(false) - err := mocks.deployer.Sync(mocks.canary) + mocks := SetupMocks() + err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -94,8 +94,8 @@ func TestCanaryDeployer_Sync(t *testing.T) { } func TestCanaryDeployer_IsNewSpec(t *testing.T) { - mocks := SetupMocks(false) - err := mocks.deployer.Sync(mocks.canary) + mocks := SetupMocks() + err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -106,7 +106,7 @@ func TestCanaryDeployer_IsNewSpec(t *testing.T) { t.Fatal(err.Error()) } - isNew, err := mocks.deployer.IsNewSpec(mocks.canary) + isNew, err := mocks.deployer.HasDeploymentChanged(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -117,8 +117,8 @@ func TestCanaryDeployer_IsNewSpec(t *testing.T) { } func TestCanaryDeployer_Promote(t *testing.T) { - mocks := SetupMocks(false) - err := mocks.deployer.Sync(mocks.canary) + mocks := SetupMocks() + err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -162,8 +162,8 @@ func TestCanaryDeployer_Promote(t *testing.T) { } func TestCanaryDeployer_IsReady(t *testing.T) { - mocks := SetupMocks(false) - err := mocks.deployer.Sync(mocks.canary) + mocks := SetupMocks() + err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Error("Expected primary readiness check to fail") } @@ -180,8 +180,8 @@ func TestCanaryDeployer_IsReady(t *testing.T) { } func TestCanaryDeployer_SetFailedChecks(t *testing.T) { - mocks := SetupMocks(false) - err := mocks.deployer.Sync(mocks.canary) + mocks := SetupMocks() + err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -202,8 +202,8 @@ func TestCanaryDeployer_SetFailedChecks(t *testing.T) { } func TestCanaryDeployer_SetState(t *testing.T) { - mocks := SetupMocks(false) - err := mocks.deployer.Sync(mocks.canary) + mocks := SetupMocks() + err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -224,8 +224,8 @@ func TestCanaryDeployer_SetState(t *testing.T) { } func TestCanaryDeployer_SyncStatus(t *testing.T) { - mocks := SetupMocks(false) - err := mocks.deployer.Sync(mocks.canary) + mocks := SetupMocks() + err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -263,8 +263,8 @@ func TestCanaryDeployer_SyncStatus(t *testing.T) { } func TestCanaryDeployer_Scale(t *testing.T) { - mocks := SetupMocks(false) - err := mocks.deployer.Sync(mocks.canary) + mocks := SetupMocks() + err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -279,5 +279,4 @@ func TestCanaryDeployer_Scale(t *testing.T) { if *c.Spec.Replicas != 2 { t.Errorf("Got replicas %v wanted %v", *c.Spec.Replicas, 2) } - } diff --git a/pkg/canary/mock.go b/pkg/canary/mock.go new file mode 100644 index 000000000..17a434aff --- /dev/null +++ b/pkg/canary/mock.go @@ -0,0 +1,470 @@ +package canary + +import ( + "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" + clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" + fakeFlagger "github.com/weaveworks/flagger/pkg/client/clientset/versioned/fake" + "github.com/weaveworks/flagger/pkg/logger" + "go.uber.org/zap" + appsv1 "k8s.io/api/apps/v1" + hpav1 "k8s.io/api/autoscaling/v1" + hpav2 "k8s.io/api/autoscaling/v2beta1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" +) + +type Mocks struct { + canary *v1alpha3.Canary + kubeClient kubernetes.Interface + flaggerClient clientset.Interface + deployer Deployer + logger *zap.SugaredLogger +} + +func SetupMocks() Mocks { + // init canary + canary := newTestCanary() + flaggerClient := fakeFlagger.NewSimpleClientset(canary) + + // init kube clientset and register mock objects + kubeClient := fake.NewSimpleClientset( + newTestDeployment(), + newTestHPA(), + NewTestConfigMap(), + NewTestConfigMapEnv(), + NewTestConfigMapVol(), + NewTestSecret(), + NewTestSecretEnv(), + NewTestSecretVol(), + ) + + logger, _ := logger.NewLogger("debug") + + deployer := Deployer{ + FlaggerClient: flaggerClient, + KubeClient: kubeClient, + Logger: logger, + ConfigTracker: ConfigTracker{ + Logger: logger, + KubeClient: kubeClient, + FlaggerClient: flaggerClient, + }, + } + + return Mocks{ + canary: canary, + deployer: deployer, + logger: logger, + flaggerClient: flaggerClient, + kubeClient: kubeClient, + } +} + +func NewTestConfigMap() *corev1.ConfigMap { + return &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo-config-env", + }, + Data: map[string]string{ + "color": "red", + }, + } +} + +func NewTestConfigMapV2() *corev1.ConfigMap { + return &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo-config-env", + }, + Data: map[string]string{ + "color": "blue", + "output": "console", + }, + } +} + +func NewTestConfigMapEnv() *corev1.ConfigMap { + return &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo-config-all-env", + }, + Data: map[string]string{ + "color": "red", + }, + } +} + +func NewTestConfigMapVol() *corev1.ConfigMap { + return &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo-config-vol", + }, + Data: map[string]string{ + "color": "red", + }, + } +} + +func NewTestSecret() *corev1.Secret { + return &corev1.Secret{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo-secret-env", + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "apiKey": []byte("test"), + }, + } +} + +func NewTestSecretV2() *corev1.Secret { + return &corev1.Secret{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo-secret-env", + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "apiKey": []byte("test2"), + }, + } +} + +func NewTestSecretEnv() *corev1.Secret { + return &corev1.Secret{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo-secret-all-env", + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "apiKey": []byte("test"), + }, + } +} + +func NewTestSecretVol() *corev1.Secret { + return &corev1.Secret{ + TypeMeta: metav1.TypeMeta{APIVersion: corev1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo-secret-vol", + }, + Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "apiKey": []byte("test"), + }, + } +} + +func newTestCanary() *v1alpha3.Canary { + cd := &v1alpha3.Canary{ + TypeMeta: metav1.TypeMeta{APIVersion: v1alpha3.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo", + }, + Spec: v1alpha3.CanarySpec{ + TargetRef: hpav1.CrossVersionObjectReference{ + Name: "podinfo", + APIVersion: "apps/v1", + Kind: "Deployment", + }, + AutoscalerRef: &hpav1.CrossVersionObjectReference{ + Name: "podinfo", + APIVersion: "autoscaling/v2beta1", + Kind: "HorizontalPodAutoscaler", + }, Service: v1alpha3.CanaryService{ + Port: 9898, + }, CanaryAnalysis: v1alpha3.CanaryAnalysis{ + Threshold: 10, + StepWeight: 10, + MaxWeight: 50, + Metrics: []v1alpha3.CanaryMetric{ + { + Name: "istio_requests_total", + Threshold: 99, + Interval: "1m", + }, + { + Name: "istio_request_duration_seconds_bucket", + Threshold: 500, + Interval: "1m", + }, + }, + }, + }, + } + return cd +} + +func newTestDeployment() *appsv1.Deployment { + d := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{APIVersion: appsv1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "podinfo", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "podinfo", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "podinfo", + Image: "quay.io/stefanprodan/podinfo:1.2.0", + Command: []string{ + "./podinfo", + "--port=9898", + }, + Args: nil, + WorkingDir: "", + Ports: []corev1.ContainerPort{ + { + Name: "http", + ContainerPort: 9898, + Protocol: corev1.ProtocolTCP, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "PODINFO_UI_COLOR", + ValueFrom: &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-config-env", + }, + Key: "color", + }, + }, + }, + { + Name: "API_KEY", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-secret-env", + }, + Key: "apiKey", + }, + }, + }, + }, + EnvFrom: []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-config-all-env", + }, + }, + }, + { + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-secret-all-env", + }, + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "config", + MountPath: "/etc/podinfo/config", + ReadOnly: true, + }, + { + Name: "secret", + MountPath: "/etc/podinfo/secret", + ReadOnly: true, + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-config-vol", + }, + }, + }, + }, + { + Name: "secret", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "podinfo-secret-vol", + }, + }, + }, + }, + }, + }, + }, + } + + return d +} + +func newTestDeploymentV2() *appsv1.Deployment { + d := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{APIVersion: appsv1.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "podinfo", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "podinfo", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "podinfo", + Image: "quay.io/stefanprodan/podinfo:1.2.1", + Ports: []corev1.ContainerPort{ + { + Name: "http", + ContainerPort: 9898, + Protocol: corev1.ProtocolTCP, + }, + }, + Command: []string{ + "./podinfo", + "--port=9898", + }, + Env: []corev1.EnvVar{ + { + Name: "PODINFO_UI_COLOR", + ValueFrom: &corev1.EnvVarSource{ + ConfigMapKeyRef: &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-config-env", + }, + Key: "color", + }, + }, + }, + { + Name: "API_KEY", + ValueFrom: &corev1.EnvVarSource{ + SecretKeyRef: &corev1.SecretKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-secret-env", + }, + Key: "apiKey", + }, + }, + }, + }, + EnvFrom: []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-config-all-env", + }, + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "config", + MountPath: "/etc/podinfo/config", + ReadOnly: true, + }, + { + Name: "secret", + MountPath: "/etc/podinfo/secret", + ReadOnly: true, + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "podinfo-config-vol", + }, + }, + }, + }, + { + Name: "secret", + VolumeSource: corev1.VolumeSource{ + Secret: &corev1.SecretVolumeSource{ + SecretName: "podinfo-secret-vol", + }, + }, + }, + }, + }, + }, + }, + } + + return d +} + +func newTestHPA() *hpav2.HorizontalPodAutoscaler { + h := &hpav2.HorizontalPodAutoscaler{ + TypeMeta: metav1.TypeMeta{APIVersion: hpav2.SchemeGroupVersion.String()}, + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "podinfo", + }, + Spec: hpav2.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: hpav2.CrossVersionObjectReference{ + Name: "podinfo", + APIVersion: "apps/v1", + Kind: "Deployment", + }, + Metrics: []hpav2.MetricSpec{ + { + Type: "Resource", + Resource: &hpav2.ResourceMetricSource{ + Name: "cpu", + TargetAverageUtilization: int32p(99), + }, + }, + }, + }, + } + + return h +} diff --git a/pkg/canary/ready.go b/pkg/canary/ready.go new file mode 100644 index 000000000..c7884ba40 --- /dev/null +++ b/pkg/canary/ready.go @@ -0,0 +1,112 @@ +package canary + +import ( + "fmt" + "time" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// IsPrimaryReady checks the primary deployment status and returns an error if +// the deployment is in the middle of a rolling update or if the pods are unhealthy +// it will return a non retriable error if the rolling update is stuck +func (c *Deployer) IsPrimaryReady(cd *flaggerv1.Canary) (bool, error) { + primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) + primary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return true, fmt.Errorf("deployment %s.%s not found", primaryName, cd.Namespace) + } + return true, fmt.Errorf("deployment %s.%s query error %v", primaryName, cd.Namespace, err) + } + + retriable, err := c.isDeploymentReady(primary, cd.GetProgressDeadlineSeconds()) + if err != nil { + return retriable, fmt.Errorf("Halt advancement %s.%s %s", primaryName, cd.Namespace, err.Error()) + } + + if primary.Spec.Replicas == int32p(0) { + return true, fmt.Errorf("Halt %s.%s advancement primary deployment is scaled to zero", + cd.Name, cd.Namespace) + } + return true, nil +} + +// IsCanaryReady checks the primary deployment status and returns an error if +// the deployment is in the middle of a rolling update or if the pods are unhealthy +// it will return a non retriable error if the rolling update is stuck +func (c *Deployer) IsCanaryReady(cd *flaggerv1.Canary) (bool, error) { + targetName := cd.Spec.TargetRef.Name + canary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return true, fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) + } + return true, fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) + } + + retriable, err := c.isDeploymentReady(canary, cd.GetProgressDeadlineSeconds()) + if err != nil { + if retriable { + return retriable, fmt.Errorf("Halt advancement %s.%s %s", targetName, cd.Namespace, err.Error()) + } else { + return retriable, fmt.Errorf("deployment does not have minimum availability for more than %vs", + cd.GetProgressDeadlineSeconds()) + } + } + + return true, nil +} + +// isDeploymentReady determines if a deployment is ready by checking the status conditions +// if a deployment has exceeded the progress deadline it returns a non retriable error +func (c *Deployer) isDeploymentReady(deployment *appsv1.Deployment, deadline int) (bool, error) { + retriable := true + if deployment.Generation <= deployment.Status.ObservedGeneration { + progress := c.getDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing) + if progress != nil { + // Determine if the deployment is stuck by checking if there is a minimum replicas unavailable condition + // and if the last update time exceeds the deadline + available := c.getDeploymentCondition(deployment.Status, appsv1.DeploymentAvailable) + if available != nil && available.Status == "False" && available.Reason == "MinimumReplicasUnavailable" { + from := available.LastUpdateTime + delta := time.Duration(deadline) * time.Second + retriable = !from.Add(delta).Before(time.Now()) + } + } + + if progress != nil && progress.Reason == "ProgressDeadlineExceeded" { + return false, fmt.Errorf("deployment %q exceeded its progress deadline", deployment.GetName()) + } else if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas { + return retriable, fmt.Errorf("waiting for rollout to finish: %d out of %d new replicas have been updated", + deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas) + } else if deployment.Status.Replicas > deployment.Status.UpdatedReplicas { + return retriable, fmt.Errorf("waiting for rollout to finish: %d old replicas are pending termination", + deployment.Status.Replicas-deployment.Status.UpdatedReplicas) + } else if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas { + return retriable, fmt.Errorf("waiting for rollout to finish: %d of %d updated replicas are available", + deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas) + } + + } else { + return true, fmt.Errorf("waiting for rollout to finish: observed deployment generation less then desired generation") + } + + return true, nil +} + +func (c *Deployer) getDeploymentCondition( + status appsv1.DeploymentStatus, + conditionType appsv1.DeploymentConditionType, +) *appsv1.DeploymentCondition { + for i := range status.Conditions { + c := status.Conditions[i] + if c.Type == conditionType { + return &c + } + } + return nil +} diff --git a/pkg/canary/status.go b/pkg/canary/status.go new file mode 100644 index 000000000..e613e570d --- /dev/null +++ b/pkg/canary/status.go @@ -0,0 +1,104 @@ +package canary + +import ( + "encoding/base64" + "encoding/json" + "fmt" + + flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// SyncStatus encodes the canary pod spec and updates the canary status +func (c *Deployer) SyncStatus(cd *flaggerv1.Canary, status flaggerv1.CanaryStatus) error { + dep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(cd.Spec.TargetRef.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return fmt.Errorf("deployment %s.%s not found", cd.Spec.TargetRef.Name, cd.Namespace) + } + return fmt.Errorf("deployment %s.%s query error %v", cd.Spec.TargetRef.Name, cd.Namespace, err) + } + + specJson, err := json.Marshal(dep.Spec.Template.Spec) + if err != nil { + return fmt.Errorf("deployment %s.%s marshal error %v", cd.Spec.TargetRef.Name, cd.Namespace, err) + } + + configs, err := c.ConfigTracker.GetConfigRefs(cd) + if err != nil { + return fmt.Errorf("configs query error %v", err) + } + + cdCopy := cd.DeepCopy() + cdCopy.Status.Phase = status.Phase + cdCopy.Status.CanaryWeight = status.CanaryWeight + cdCopy.Status.FailedChecks = status.FailedChecks + cdCopy.Status.Iterations = status.Iterations + cdCopy.Status.LastAppliedSpec = base64.StdEncoding.EncodeToString(specJson) + cdCopy.Status.LastTransitionTime = metav1.Now() + cdCopy.Status.TrackedConfigs = configs + + cd, err = c.FlaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) + if err != nil { + return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) + } + return nil +} + +// SetStatusFailedChecks updates the canary failed checks counter +func (c *Deployer) SetStatusFailedChecks(cd *flaggerv1.Canary, val int) error { + cdCopy := cd.DeepCopy() + cdCopy.Status.FailedChecks = val + cdCopy.Status.LastTransitionTime = metav1.Now() + + cd, err := c.FlaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) + if err != nil { + return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) + } + return nil +} + +// SetStatusWeight updates the canary status weight value +func (c *Deployer) SetStatusWeight(cd *flaggerv1.Canary, val int) error { + cdCopy := cd.DeepCopy() + cdCopy.Status.CanaryWeight = val + cdCopy.Status.LastTransitionTime = metav1.Now() + + cd, err := c.FlaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) + if err != nil { + return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) + } + return nil +} + +// SetStatusIterations updates the canary status iterations value +func (c *Deployer) SetStatusIterations(cd *flaggerv1.Canary, val int) error { + cdCopy := cd.DeepCopy() + cdCopy.Status.Iterations = val + cdCopy.Status.LastTransitionTime = metav1.Now() + + cd, err := c.FlaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) + if err != nil { + return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) + } + return nil +} + +// SetStatusPhase updates the canary status phase +func (c *Deployer) SetStatusPhase(cd *flaggerv1.Canary, phase flaggerv1.CanaryPhase) error { + cdCopy := cd.DeepCopy() + cdCopy.Status.Phase = phase + cdCopy.Status.LastTransitionTime = metav1.Now() + + if phase != flaggerv1.CanaryProgressing { + cdCopy.Status.CanaryWeight = 0 + cdCopy.Status.Iterations = 0 + } + + cd, err := c.FlaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) + if err != nil { + return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) + } + return nil +} diff --git a/pkg/controller/tracker.go b/pkg/canary/tracker.go similarity index 88% rename from pkg/controller/tracker.go rename to pkg/canary/tracker.go index 5644b875f..f5c413276 100644 --- a/pkg/controller/tracker.go +++ b/pkg/canary/tracker.go @@ -1,4 +1,4 @@ -package controller +package canary import ( "crypto/sha256" @@ -16,9 +16,9 @@ import ( // ConfigTracker is managing the operations for Kubernetes ConfigMaps and Secrets type ConfigTracker struct { - kubeClient kubernetes.Interface - flaggerClient clientset.Interface - logger *zap.SugaredLogger + KubeClient kubernetes.Interface + FlaggerClient clientset.Interface + Logger *zap.SugaredLogger } type ConfigRefType string @@ -50,7 +50,7 @@ func checksum(data interface{}) string { // getRefFromConfigMap transforms a Kubernetes ConfigMap into a ConfigRef // and computes the checksum of the ConfigMap data func (ct *ConfigTracker) getRefFromConfigMap(name string, namespace string) (*ConfigRef, error) { - config, err := ct.kubeClient.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) + config, err := ct.KubeClient.CoreV1().ConfigMaps(namespace).Get(name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -65,7 +65,7 @@ func (ct *ConfigTracker) getRefFromConfigMap(name string, namespace string) (*Co // getRefFromConfigMap transforms a Kubernetes Secret into a ConfigRef // and computes the checksum of the Secret data func (ct *ConfigTracker) getRefFromSecret(name string, namespace string) (*ConfigRef, error) { - secret, err := ct.kubeClient.CoreV1().Secrets(namespace).Get(name, metav1.GetOptions{}) + secret, err := ct.KubeClient.CoreV1().Secrets(namespace).Get(name, metav1.GetOptions{}) if err != nil { return nil, err } @@ -75,7 +75,7 @@ func (ct *ConfigTracker) getRefFromSecret(name string, namespace string) (*Confi secret.Type != corev1.SecretTypeBasicAuth && secret.Type != corev1.SecretTypeSSHAuth && secret.Type != corev1.SecretTypeTLS { - ct.logger.Debugf("ignoring secret %s.%s type not supported %v", name, namespace, secret.Type) + ct.Logger.Debugf("ignoring secret %s.%s type not supported %v", name, namespace, secret.Type) return nil, nil } @@ -91,7 +91,7 @@ func (ct *ConfigTracker) getRefFromSecret(name string, namespace string) (*Confi func (ct *ConfigTracker) GetTargetConfigs(cd *flaggerv1.Canary) (map[string]ConfigRef, error) { res := make(map[string]ConfigRef) targetName := cd.Spec.TargetRef.Name - targetDep, err := ct.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) + targetDep, err := ct.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return res, fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) @@ -104,7 +104,7 @@ func (ct *ConfigTracker) GetTargetConfigs(cd *flaggerv1.Canary) (map[string]Conf if cmv := volume.ConfigMap; cmv != nil { config, err := ct.getRefFromConfigMap(cmv.Name, cd.Namespace) if err != nil { - ct.logger.Errorf("configMap %s.%s query error %v", cmv.Name, cd.Namespace, err) + ct.Logger.Errorf("configMap %s.%s query error %v", cmv.Name, cd.Namespace, err) continue } if config != nil { @@ -115,7 +115,7 @@ func (ct *ConfigTracker) GetTargetConfigs(cd *flaggerv1.Canary) (map[string]Conf if sv := volume.Secret; sv != nil { secret, err := ct.getRefFromSecret(sv.SecretName, cd.Namespace) if err != nil { - ct.logger.Errorf("secret %s.%s query error %v", sv.SecretName, cd.Namespace, err) + ct.Logger.Errorf("secret %s.%s query error %v", sv.SecretName, cd.Namespace, err) continue } if secret != nil { @@ -133,7 +133,7 @@ func (ct *ConfigTracker) GetTargetConfigs(cd *flaggerv1.Canary) (map[string]Conf name := env.ValueFrom.ConfigMapKeyRef.LocalObjectReference.Name config, err := ct.getRefFromConfigMap(name, cd.Namespace) if err != nil { - ct.logger.Errorf("configMap %s.%s query error %v", name, cd.Namespace, err) + ct.Logger.Errorf("configMap %s.%s query error %v", name, cd.Namespace, err) continue } if config != nil { @@ -143,7 +143,7 @@ func (ct *ConfigTracker) GetTargetConfigs(cd *flaggerv1.Canary) (map[string]Conf name := env.ValueFrom.SecretKeyRef.LocalObjectReference.Name secret, err := ct.getRefFromSecret(name, cd.Namespace) if err != nil { - ct.logger.Errorf("secret %s.%s query error %v", name, cd.Namespace, err) + ct.Logger.Errorf("secret %s.%s query error %v", name, cd.Namespace, err) continue } if secret != nil { @@ -159,7 +159,7 @@ func (ct *ConfigTracker) GetTargetConfigs(cd *flaggerv1.Canary) (map[string]Conf name := envFrom.ConfigMapRef.LocalObjectReference.Name config, err := ct.getRefFromConfigMap(name, cd.Namespace) if err != nil { - ct.logger.Errorf("configMap %s.%s query error %v", name, cd.Namespace, err) + ct.Logger.Errorf("configMap %s.%s query error %v", name, cd.Namespace, err) continue } if config != nil { @@ -169,7 +169,7 @@ func (ct *ConfigTracker) GetTargetConfigs(cd *flaggerv1.Canary) (map[string]Conf name := envFrom.SecretRef.LocalObjectReference.Name secret, err := ct.getRefFromSecret(name, cd.Namespace) if err != nil { - ct.logger.Errorf("secret %s.%s query error %v", name, cd.Namespace, err) + ct.Logger.Errorf("secret %s.%s query error %v", name, cd.Namespace, err) continue } if secret != nil { @@ -221,7 +221,7 @@ func (ct *ConfigTracker) HasConfigChanged(cd *flaggerv1.Canary) (bool, error) { for _, cfg := range configs { if trackedConfigs[cfg.GetName()] != cfg.Checksum { - ct.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)). + ct.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)). Infof("%s %s has changed", cfg.Type, cfg.Name) return true, nil } @@ -236,7 +236,7 @@ func (ct *ConfigTracker) CreatePrimaryConfigs(cd *flaggerv1.Canary, refs map[str for _, ref := range refs { switch ref.Type { case ConfigRefMap: - config, err := ct.kubeClient.CoreV1().ConfigMaps(cd.Namespace).Get(ref.Name, metav1.GetOptions{}) + config, err := ct.KubeClient.CoreV1().ConfigMaps(cd.Namespace).Get(ref.Name, metav1.GetOptions{}) if err != nil { return err } @@ -258,10 +258,10 @@ func (ct *ConfigTracker) CreatePrimaryConfigs(cd *flaggerv1.Canary, refs map[str } // update or insert primary ConfigMap - _, err = ct.kubeClient.CoreV1().ConfigMaps(cd.Namespace).Update(primaryConfigMap) + _, err = ct.KubeClient.CoreV1().ConfigMaps(cd.Namespace).Update(primaryConfigMap) if err != nil { if errors.IsNotFound(err) { - _, err = ct.kubeClient.CoreV1().ConfigMaps(cd.Namespace).Create(primaryConfigMap) + _, err = ct.KubeClient.CoreV1().ConfigMaps(cd.Namespace).Create(primaryConfigMap) if err != nil { return err } @@ -270,10 +270,10 @@ func (ct *ConfigTracker) CreatePrimaryConfigs(cd *flaggerv1.Canary, refs map[str } } - ct.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)). + ct.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)). Infof("ConfigMap %s synced", primaryConfigMap.GetName()) case ConfigRefSecret: - secret, err := ct.kubeClient.CoreV1().Secrets(cd.Namespace).Get(ref.Name, metav1.GetOptions{}) + secret, err := ct.KubeClient.CoreV1().Secrets(cd.Namespace).Get(ref.Name, metav1.GetOptions{}) if err != nil { return err } @@ -296,10 +296,10 @@ func (ct *ConfigTracker) CreatePrimaryConfigs(cd *flaggerv1.Canary, refs map[str } // update or insert primary Secret - _, err = ct.kubeClient.CoreV1().Secrets(cd.Namespace).Update(primarySecret) + _, err = ct.KubeClient.CoreV1().Secrets(cd.Namespace).Update(primarySecret) if err != nil { if errors.IsNotFound(err) { - _, err = ct.kubeClient.CoreV1().Secrets(cd.Namespace).Create(primarySecret) + _, err = ct.KubeClient.CoreV1().Secrets(cd.Namespace).Create(primarySecret) if err != nil { return err } @@ -308,7 +308,7 @@ func (ct *ConfigTracker) CreatePrimaryConfigs(cd *flaggerv1.Canary, refs map[str } } - ct.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)). + ct.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)). Infof("Secret %s synced", primarySecret.GetName()) } } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 6fbc2b91a..093884313 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -2,6 +2,7 @@ package controller import ( "fmt" + "github.com/weaveworks/flagger/pkg/canary" "github.com/weaveworks/flagger/pkg/metrics" "sync" "time" @@ -41,7 +42,7 @@ type Controller struct { logger *zap.SugaredLogger canaries *sync.Map jobs map[string]CanaryJob - deployer CanaryDeployer + deployer canary.Deployer observer metrics.Observer recorder metrics.Recorder notifier *notifier.Slack @@ -70,14 +71,14 @@ func NewController( eventRecorder := eventBroadcaster.NewRecorder( scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) - deployer := CanaryDeployer{ - logger: logger, - kubeClient: kubeClient, - flaggerClient: flaggerClient, - configTracker: ConfigTracker{ - logger: logger, - kubeClient: kubeClient, - flaggerClient: flaggerClient, + deployer := canary.Deployer{ + Logger: logger, + KubeClient: kubeClient, + FlaggerClient: flaggerClient, + ConfigTracker: canary.ConfigTracker{ + Logger: logger, + KubeClient: kubeClient, + FlaggerClient: flaggerClient, }, } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index dd605d7c0..2954d1253 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -4,10 +4,11 @@ import ( "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" istiov1alpha1 "github.com/weaveworks/flagger/pkg/apis/istio/common/v1alpha1" istiov1alpha3 "github.com/weaveworks/flagger/pkg/apis/istio/v1alpha3" + "github.com/weaveworks/flagger/pkg/canary" clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" fakeFlagger "github.com/weaveworks/flagger/pkg/client/clientset/versioned/fake" informers "github.com/weaveworks/flagger/pkg/client/informers/externalversions" - "github.com/weaveworks/flagger/pkg/logging" + "github.com/weaveworks/flagger/pkg/logger" "github.com/weaveworks/flagger/pkg/metrics" "github.com/weaveworks/flagger/pkg/router" "go.uber.org/zap" @@ -34,7 +35,7 @@ type Mocks struct { kubeClient kubernetes.Interface meshClient clientset.Interface flaggerClient clientset.Interface - deployer CanaryDeployer + deployer canary.Deployer observer metrics.Observer ctrl *Controller logger *zap.SugaredLogger @@ -43,11 +44,11 @@ type Mocks struct { func SetupMocks(abtest bool) Mocks { // init canary - canary := newTestCanary() + c := newTestCanary() if abtest { - canary = newTestCanaryAB() + c = newTestCanaryAB() } - flaggerClient := fakeFlagger.NewSimpleClientset(canary) + flaggerClient := fakeFlagger.NewSimpleClientset(c) // init kube clientset and register mock objects kubeClient := fake.NewSimpleClientset( @@ -61,17 +62,17 @@ func SetupMocks(abtest bool) Mocks { NewTestSecretVol(), ) - logger, _ := logging.NewLogger("debug") + logger, _ := logger.NewLogger("debug") // init controller helpers - deployer := CanaryDeployer{ - flaggerClient: flaggerClient, - kubeClient: kubeClient, - logger: logger, - configTracker: ConfigTracker{ - logger: logger, - kubeClient: kubeClient, - flaggerClient: flaggerClient, + deployer := canary.Deployer{ + Logger: logger, + KubeClient: kubeClient, + FlaggerClient: flaggerClient, + ConfigTracker: canary.ConfigTracker{ + Logger: logger, + KubeClient: kubeClient, + FlaggerClient: flaggerClient, }, } observer := metrics.NewObserver("fake") @@ -102,7 +103,7 @@ func SetupMocks(abtest bool) Mocks { meshRouter := rf.MeshRouter("istio") return Mocks{ - canary: canary, + canary: c, observer: observer, deployer: deployer, logger: logger, diff --git a/pkg/controller/deployer.go b/pkg/controller/deployer.go deleted file mode 100644 index 8bd0c80cd..000000000 --- a/pkg/controller/deployer.go +++ /dev/null @@ -1,566 +0,0 @@ -package controller - -import ( - "crypto/rand" - "encoding/base64" - "encoding/json" - "fmt" - "io" - "time" - - "github.com/google/go-cmp/cmp" - "github.com/google/go-cmp/cmp/cmpopts" - flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3" - clientset "github.com/weaveworks/flagger/pkg/client/clientset/versioned" - "go.uber.org/zap" - appsv1 "k8s.io/api/apps/v1" - hpav1 "k8s.io/api/autoscaling/v2beta1" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/kubernetes" -) - -// CanaryDeployer is managing the operations for Kubernetes deployment kind -type CanaryDeployer struct { - kubeClient kubernetes.Interface - flaggerClient clientset.Interface - logger *zap.SugaredLogger - configTracker ConfigTracker -} - -// Promote copies the pod spec, secrets and config maps from canary to primary -func (c *CanaryDeployer) Promote(cd *flaggerv1.Canary) error { - targetName := cd.Spec.TargetRef.Name - primaryName := fmt.Sprintf("%s-primary", targetName) - - canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) - } - return fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) - } - - primary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return fmt.Errorf("deployment %s.%s not found", primaryName, cd.Namespace) - } - return fmt.Errorf("deployment %s.%s query error %v", primaryName, cd.Namespace, err) - } - - // promote secrets and config maps - configRefs, err := c.configTracker.GetTargetConfigs(cd) - if err != nil { - return err - } - if err := c.configTracker.CreatePrimaryConfigs(cd, configRefs); err != nil { - return err - } - - primaryCopy := primary.DeepCopy() - primaryCopy.Spec.ProgressDeadlineSeconds = canary.Spec.ProgressDeadlineSeconds - primaryCopy.Spec.MinReadySeconds = canary.Spec.MinReadySeconds - primaryCopy.Spec.RevisionHistoryLimit = canary.Spec.RevisionHistoryLimit - primaryCopy.Spec.Strategy = canary.Spec.Strategy - - // update spec with primary secrets and config maps - primaryCopy.Spec.Template.Spec = c.configTracker.ApplyPrimaryConfigs(canary.Spec.Template.Spec, configRefs) - - // update pod annotations to ensure a rolling update - annotations, err := c.makeAnnotations(canary.Spec.Template.Annotations) - if err != nil { - return err - } - primaryCopy.Spec.Template.Annotations = annotations - - primaryCopy.Spec.Template.Labels = makePrimaryLabels(canary.Spec.Template.Labels, primaryName) - - _, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Update(primaryCopy) - if err != nil { - return fmt.Errorf("updating deployment %s.%s template spec failed: %v", - primaryCopy.GetName(), primaryCopy.Namespace, err) - } - - return nil -} - -// IsPrimaryReady checks the primary deployment status and returns an error if -// the deployment is in the middle of a rolling update or if the pods are unhealthy -// it will return a non retriable error if the rolling update is stuck -func (c *CanaryDeployer) IsPrimaryReady(cd *flaggerv1.Canary) (bool, error) { - primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) - primary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return true, fmt.Errorf("deployment %s.%s not found", primaryName, cd.Namespace) - } - return true, fmt.Errorf("deployment %s.%s query error %v", primaryName, cd.Namespace, err) - } - - retriable, err := c.isDeploymentReady(primary, cd.GetProgressDeadlineSeconds()) - if err != nil { - return retriable, fmt.Errorf("Halt advancement %s.%s %s", primaryName, cd.Namespace, err.Error()) - } - - if primary.Spec.Replicas == int32p(0) { - return true, fmt.Errorf("Halt %s.%s advancement primary deployment is scaled to zero", - cd.Name, cd.Namespace) - } - return true, nil -} - -// IsCanaryReady checks the primary deployment status and returns an error if -// the deployment is in the middle of a rolling update or if the pods are unhealthy -// it will return a non retriable error if the rolling update is stuck -func (c *CanaryDeployer) IsCanaryReady(cd *flaggerv1.Canary) (bool, error) { - targetName := cd.Spec.TargetRef.Name - canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return true, fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) - } - return true, fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) - } - - retriable, err := c.isDeploymentReady(canary, cd.GetProgressDeadlineSeconds()) - if err != nil { - if retriable { - return retriable, fmt.Errorf("Halt advancement %s.%s %s", targetName, cd.Namespace, err.Error()) - } else { - return retriable, fmt.Errorf("deployment does not have minimum availability for more than %vs", - cd.GetProgressDeadlineSeconds()) - } - } - - return true, nil -} - -// IsNewSpec returns true if the canary deployment pod spec has changed -func (c *CanaryDeployer) IsNewSpec(cd *flaggerv1.Canary) (bool, error) { - targetName := cd.Spec.TargetRef.Name - canary, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return false, fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) - } - return false, fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) - } - - if cd.Status.LastAppliedSpec == "" { - return true, nil - } - - newSpec := &canary.Spec.Template.Spec - oldSpecJson, err := base64.StdEncoding.DecodeString(cd.Status.LastAppliedSpec) - if err != nil { - return false, fmt.Errorf("%s.%s decode error %v", cd.Name, cd.Namespace, err) - } - oldSpec := &corev1.PodSpec{} - err = json.Unmarshal(oldSpecJson, oldSpec) - if err != nil { - return false, fmt.Errorf("%s.%s unmarshal error %v", cd.Name, cd.Namespace, err) - } - - if diff := cmp.Diff(*newSpec, *oldSpec, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { - //fmt.Println(diff) - return true, nil - } - - return false, nil -} - -// ShouldAdvance determines if the canary analysis can proceed -func (c *CanaryDeployer) ShouldAdvance(cd *flaggerv1.Canary) (bool, error) { - if cd.Status.LastAppliedSpec == "" || cd.Status.Phase == flaggerv1.CanaryProgressing { - return true, nil - } - - newDep, err := c.IsNewSpec(cd) - if err != nil { - return false, err - } - if newDep { - return newDep, nil - } - - newCfg, err := c.configTracker.HasConfigChanged(cd) - if err != nil { - return false, err - } - - return newCfg, nil - -} - -// SetStatusFailedChecks updates the canary failed checks counter -func (c *CanaryDeployer) SetStatusFailedChecks(cd *flaggerv1.Canary, val int) error { - cdCopy := cd.DeepCopy() - cdCopy.Status.FailedChecks = val - cdCopy.Status.LastTransitionTime = metav1.Now() - - cd, err := c.flaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) - if err != nil { - return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) - } - return nil -} - -// SetStatusWeight updates the canary status weight value -func (c *CanaryDeployer) SetStatusWeight(cd *flaggerv1.Canary, val int) error { - cdCopy := cd.DeepCopy() - cdCopy.Status.CanaryWeight = val - cdCopy.Status.LastTransitionTime = metav1.Now() - - cd, err := c.flaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) - if err != nil { - return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) - } - return nil -} - -// SetStatusIterations updates the canary status iterations value -func (c *CanaryDeployer) SetStatusIterations(cd *flaggerv1.Canary, val int) error { - cdCopy := cd.DeepCopy() - cdCopy.Status.Iterations = val - cdCopy.Status.LastTransitionTime = metav1.Now() - - cd, err := c.flaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) - if err != nil { - return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) - } - return nil -} - -// SetStatusWeight updates the canary status weight value -func (c *CanaryDeployer) IncrementStatusIterations(cd *flaggerv1.Canary) error { - cdCopy := cd.DeepCopy() - cdCopy.Status.Iterations = cdCopy.Status.Iterations + 1 - cdCopy.Status.LastTransitionTime = metav1.Now() - - cd, err := c.flaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) - if err != nil { - return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) - } - return nil -} - -// SetStatusPhase updates the canary status phase -func (c *CanaryDeployer) SetStatusPhase(cd *flaggerv1.Canary, phase flaggerv1.CanaryPhase) error { - cdCopy := cd.DeepCopy() - cdCopy.Status.Phase = phase - cdCopy.Status.LastTransitionTime = metav1.Now() - - if phase != flaggerv1.CanaryProgressing { - cdCopy.Status.CanaryWeight = 0 - cdCopy.Status.Iterations = 0 - } - - cd, err := c.flaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) - if err != nil { - return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) - } - return nil -} - -// SyncStatus encodes the canary pod spec and updates the canary status -func (c *CanaryDeployer) SyncStatus(cd *flaggerv1.Canary, status flaggerv1.CanaryStatus) error { - dep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(cd.Spec.TargetRef.Name, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return fmt.Errorf("deployment %s.%s not found", cd.Spec.TargetRef.Name, cd.Namespace) - } - return fmt.Errorf("deployment %s.%s query error %v", cd.Spec.TargetRef.Name, cd.Namespace, err) - } - - specJson, err := json.Marshal(dep.Spec.Template.Spec) - if err != nil { - return fmt.Errorf("deployment %s.%s marshal error %v", cd.Spec.TargetRef.Name, cd.Namespace, err) - } - - configs, err := c.configTracker.GetConfigRefs(cd) - if err != nil { - return fmt.Errorf("configs query error %v", err) - } - - cdCopy := cd.DeepCopy() - cdCopy.Status.Phase = status.Phase - cdCopy.Status.CanaryWeight = status.CanaryWeight - cdCopy.Status.FailedChecks = status.FailedChecks - cdCopy.Status.Iterations = status.Iterations - cdCopy.Status.LastAppliedSpec = base64.StdEncoding.EncodeToString(specJson) - cdCopy.Status.LastTransitionTime = metav1.Now() - cdCopy.Status.TrackedConfigs = configs - - cd, err = c.flaggerClient.FlaggerV1alpha3().Canaries(cd.Namespace).UpdateStatus(cdCopy) - if err != nil { - return fmt.Errorf("canary %s.%s status update error %v", cdCopy.Name, cdCopy.Namespace, err) - } - return nil -} - -// Scale sets the canary deployment replicas -func (c *CanaryDeployer) Scale(cd *flaggerv1.Canary, replicas int32) error { - targetName := cd.Spec.TargetRef.Name - dep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return fmt.Errorf("deployment %s.%s not found", targetName, cd.Namespace) - } - return fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) - } - - depCopy := dep.DeepCopy() - depCopy.Spec.Replicas = int32p(replicas) - - _, err = c.kubeClient.AppsV1().Deployments(dep.Namespace).Update(depCopy) - if err != nil { - return fmt.Errorf("scaling %s.%s to %v failed: %v", depCopy.GetName(), depCopy.Namespace, replicas, err) - } - return nil -} - -// Reconcile creates the primary deployment and hpa -// and scales to zero the canary deployment -func (c *CanaryDeployer) Sync(cd *flaggerv1.Canary) error { - primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) - if err := c.createPrimaryDeployment(cd); err != nil { - return fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err) - } - - if cd.Status.Phase == "" { - c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) - if err := c.Scale(cd, 0); err != nil { - return err - } - } - - if cd.Spec.AutoscalerRef != nil && cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" { - if err := c.createPrimaryHpa(cd); err != nil { - return fmt.Errorf("creating hpa %s.%s failed: %v", primaryName, cd.Namespace, err) - } - } - return nil -} - -func (c *CanaryDeployer) createPrimaryDeployment(cd *flaggerv1.Canary) error { - targetName := cd.Spec.TargetRef.Name - primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) - - canaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace) - } - return err - } - - if appSel, ok := canaryDep.Spec.Selector.MatchLabels["app"]; !ok || appSel != canaryDep.Name { - return fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'", - targetName, cd.Namespace, targetName) - } - - primaryDep, err := c.kubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{}) - if errors.IsNotFound(err) { - // create primary secrets and config maps - configRefs, err := c.configTracker.GetTargetConfigs(cd) - if err != nil { - return err - } - if err := c.configTracker.CreatePrimaryConfigs(cd, configRefs); err != nil { - return err - } - annotations, err := c.makeAnnotations(canaryDep.Spec.Template.Annotations) - if err != nil { - return err - } - - replicas := int32(1) - if canaryDep.Spec.Replicas != nil && *canaryDep.Spec.Replicas > 0 { - replicas = *canaryDep.Spec.Replicas - } - - // create primary deployment - primaryDep = &appsv1.Deployment{ - ObjectMeta: metav1.ObjectMeta{ - Name: primaryName, - Labels: canaryDep.Labels, - Namespace: cd.Namespace, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(cd, schema.GroupVersionKind{ - Group: flaggerv1.SchemeGroupVersion.Group, - Version: flaggerv1.SchemeGroupVersion.Version, - Kind: flaggerv1.CanaryKind, - }), - }, - }, - Spec: appsv1.DeploymentSpec{ - ProgressDeadlineSeconds: canaryDep.Spec.ProgressDeadlineSeconds, - MinReadySeconds: canaryDep.Spec.MinReadySeconds, - RevisionHistoryLimit: canaryDep.Spec.RevisionHistoryLimit, - Replicas: int32p(replicas), - Strategy: canaryDep.Spec.Strategy, - Selector: &metav1.LabelSelector{ - MatchLabels: map[string]string{ - "app": primaryName, - }, - }, - Template: corev1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ - Labels: makePrimaryLabels(canaryDep.Spec.Template.Labels, primaryName), - Annotations: annotations, - }, - // update spec with the primary secrets and config maps - Spec: c.configTracker.ApplyPrimaryConfigs(canaryDep.Spec.Template.Spec, configRefs), - }, - }, - } - - _, err = c.kubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep) - if err != nil { - return err - } - - c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace) - } - - return nil -} - -func (c *CanaryDeployer) createPrimaryHpa(cd *flaggerv1.Canary) error { - primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) - hpa, err := c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(cd.Spec.AutoscalerRef.Name, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return fmt.Errorf("HorizontalPodAutoscaler %s.%s not found, retrying", - cd.Spec.AutoscalerRef.Name, cd.Namespace) - } - return err - } - primaryHpaName := fmt.Sprintf("%s-primary", cd.Spec.AutoscalerRef.Name) - primaryHpa, err := c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Get(primaryHpaName, metav1.GetOptions{}) - - if errors.IsNotFound(err) { - primaryHpa = &hpav1.HorizontalPodAutoscaler{ - ObjectMeta: metav1.ObjectMeta{ - Name: primaryHpaName, - Namespace: cd.Namespace, - Labels: hpa.Labels, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(cd, schema.GroupVersionKind{ - Group: flaggerv1.SchemeGroupVersion.Group, - Version: flaggerv1.SchemeGroupVersion.Version, - Kind: flaggerv1.CanaryKind, - }), - }, - }, - Spec: hpav1.HorizontalPodAutoscalerSpec{ - ScaleTargetRef: hpav1.CrossVersionObjectReference{ - Name: primaryName, - Kind: hpa.Spec.ScaleTargetRef.Kind, - APIVersion: hpa.Spec.ScaleTargetRef.APIVersion, - }, - MinReplicas: hpa.Spec.MinReplicas, - MaxReplicas: hpa.Spec.MaxReplicas, - Metrics: hpa.Spec.Metrics, - }, - } - - _, err = c.kubeClient.AutoscalingV2beta1().HorizontalPodAutoscalers(cd.Namespace).Create(primaryHpa) - if err != nil { - return err - } - c.logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("HorizontalPodAutoscaler %s.%s created", primaryHpa.GetName(), cd.Namespace) - } - - return nil -} - -// isDeploymentReady determines if a deployment is ready by checking the status conditions -// if a deployment has exceeded the progress deadline it returns a non retriable error -func (c *CanaryDeployer) isDeploymentReady(deployment *appsv1.Deployment, deadline int) (bool, error) { - retriable := true - if deployment.Generation <= deployment.Status.ObservedGeneration { - progress := c.getDeploymentCondition(deployment.Status, appsv1.DeploymentProgressing) - if progress != nil { - // Determine if the deployment is stuck by checking if there is a minimum replicas unavailable condition - // and if the last update time exceeds the deadline - available := c.getDeploymentCondition(deployment.Status, appsv1.DeploymentAvailable) - if available != nil && available.Status == "False" && available.Reason == "MinimumReplicasUnavailable" { - from := available.LastUpdateTime - delta := time.Duration(deadline) * time.Second - retriable = !from.Add(delta).Before(time.Now()) - } - } - - if progress != nil && progress.Reason == "ProgressDeadlineExceeded" { - return false, fmt.Errorf("deployment %q exceeded its progress deadline", deployment.GetName()) - } else if deployment.Spec.Replicas != nil && deployment.Status.UpdatedReplicas < *deployment.Spec.Replicas { - return retriable, fmt.Errorf("waiting for rollout to finish: %d out of %d new replicas have been updated", - deployment.Status.UpdatedReplicas, *deployment.Spec.Replicas) - } else if deployment.Status.Replicas > deployment.Status.UpdatedReplicas { - return retriable, fmt.Errorf("waiting for rollout to finish: %d old replicas are pending termination", - deployment.Status.Replicas-deployment.Status.UpdatedReplicas) - } else if deployment.Status.AvailableReplicas < deployment.Status.UpdatedReplicas { - return retriable, fmt.Errorf("waiting for rollout to finish: %d of %d updated replicas are available", - deployment.Status.AvailableReplicas, deployment.Status.UpdatedReplicas) - } - - } else { - return true, fmt.Errorf("waiting for rollout to finish: observed deployment generation less then desired generation") - } - - return true, nil -} - -func (c *CanaryDeployer) getDeploymentCondition( - status appsv1.DeploymentStatus, - conditionType appsv1.DeploymentConditionType, -) *appsv1.DeploymentCondition { - for i := range status.Conditions { - c := status.Conditions[i] - if c.Type == conditionType { - return &c - } - } - return nil -} - -// makeAnnotations appends an unique ID to annotations map -func (c *CanaryDeployer) makeAnnotations(annotations map[string]string) (map[string]string, error) { - idKey := "flagger-id" - res := make(map[string]string) - uuid := make([]byte, 16) - n, err := io.ReadFull(rand.Reader, uuid) - if n != len(uuid) || err != nil { - return res, err - } - uuid[8] = uuid[8]&^0xc0 | 0x80 - uuid[6] = uuid[6]&^0xf0 | 0x40 - id := fmt.Sprintf("%x-%x-%x-%x-%x", uuid[0:4], uuid[4:6], uuid[6:8], uuid[8:10], uuid[10:]) - - for k, v := range annotations { - if k != idKey { - res[k] = v - } - } - res[idKey] = id - - return res, nil -} - -func makePrimaryLabels(labels map[string]string, primaryName string) map[string]string { - idKey := "app" - res := make(map[string]string) - for k, v := range labels { - if k != idKey { - res[k] = v - } - } - res[idKey] = primaryName - - return res -} diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index 9af0b3544..b2602bab3 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -90,7 +90,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) // create primary deployment and hpa if needed - if err := c.deployer.Sync(cd); err != nil { + if err := c.deployer.Initialize(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } @@ -111,7 +111,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh return } - shouldAdvance, err := c.deployer.ShouldAdvance(cd) + shouldAdvance, err := c.shouldAdvance(cd) if err != nil { c.recordEventWarningf(cd, "%v", err) return @@ -441,6 +441,28 @@ func (c *Controller) shouldSkipAnalysis(cd *flaggerv1.Canary, meshRouter router. return true } +func (c *Controller) shouldAdvance(cd *flaggerv1.Canary) (bool, error) { + if cd.Status.LastAppliedSpec == "" || cd.Status.Phase == flaggerv1.CanaryProgressing { + return true, nil + } + + newDep, err := c.deployer.HasDeploymentChanged(cd) + if err != nil { + return false, err + } + if newDep { + return newDep, nil + } + + newCfg, err := c.deployer.ConfigTracker.HasConfigChanged(cd) + if err != nil { + return false, err + } + + return newCfg, nil + +} + func (c *Controller) checkCanaryStatus(cd *flaggerv1.Canary, shouldAdvance bool) bool { c.recorder.SetStatus(cd, cd.Status.Phase) if cd.Status.Phase == flaggerv1.CanaryProgressing { @@ -479,10 +501,10 @@ func (c *Controller) checkCanaryStatus(cd *flaggerv1.Canary, shouldAdvance bool) func (c *Controller) hasCanaryRevisionChanged(cd *flaggerv1.Canary) bool { if cd.Status.Phase == flaggerv1.CanaryProgressing { - if diff, _ := c.deployer.IsNewSpec(cd); diff { + if diff, _ := c.deployer.HasDeploymentChanged(cd); diff { return true } - if diff, _ := c.deployer.configTracker.HasConfigChanged(cd); diff { + if diff, _ := c.deployer.ConfigTracker.HasConfigChanged(cd); diff { return true } } From 6ef72e25509cfae8ef3eb2ca95343c84bb8e0d49 Mon Sep 17 00:00:00 2001 From: stefanprodan Date: Mon, 15 Apr 2019 12:57:25 +0300 Subject: [PATCH 3/4] Make the pod selector configurable - default labels: app, name and app.kubernetes.io/name --- cmd/flagger/main.go | 9 ++++ pkg/canary/deployer.go | 69 ++++++++++++++++++++----------- pkg/canary/deployer_test.go | 16 +++---- pkg/canary/mock.go | 9 ++-- pkg/controller/controller.go | 2 + pkg/controller/controller_test.go | 1 + pkg/controller/scheduler.go | 5 ++- pkg/router/factory.go | 3 +- pkg/router/kubernetes.go | 3 +- 9 files changed, 76 insertions(+), 41 deletions(-) diff --git a/cmd/flagger/main.go b/cmd/flagger/main.go index e0d702f46..3281235f0 100644 --- a/cmd/flagger/main.go +++ b/cmd/flagger/main.go @@ -18,6 +18,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "log" + "strings" "time" ) @@ -36,6 +37,7 @@ var ( zapEncoding string namespace string meshProvider string + selectorLabels string ) func init() { @@ -53,6 +55,7 @@ func init() { flag.StringVar(&zapEncoding, "zap-encoding", "json", "Zap logger encoding.") flag.StringVar(&namespace, "namespace", "", "Namespace that flagger would watch canary object") flag.StringVar(&meshProvider, "mesh-provider", "istio", "Service mesh provider, can be istio or appmesh") + flag.StringVar(&selectorLabels, "selector-labels", "app,name,app.kubernetes.io/name", "List of pod labels that Flagger uses to create pod selectors") } func main() { @@ -101,6 +104,11 @@ func main() { logger.Fatalf("Error calling Kubernetes API: %v", err) } + labels := strings.Split(selectorLabels, ",") + if len(labels) < 1 { + logger.Fatalf("At least one selector label is required") + } + logger.Infof("Connected to Kubernetes API %s", ver) if namespace != "" { logger.Infof("Watching namespace %s", namespace) @@ -137,6 +145,7 @@ func main() { slack, meshProvider, version.VERSION, + labels, ) flaggerInformerFactory.Start(stopCh) diff --git a/pkg/canary/deployer.go b/pkg/canary/deployer.go index 937dff30a..bb8aa1210 100644 --- a/pkg/canary/deployer.go +++ b/pkg/canary/deployer.go @@ -27,29 +27,31 @@ type Deployer struct { FlaggerClient clientset.Interface Logger *zap.SugaredLogger ConfigTracker ConfigTracker + Labels []string } -// Initialize creates the primary deployment and hpa -// and scales to zero the canary deployment -func (c *Deployer) Initialize(cd *flaggerv1.Canary) error { +// Initialize creates the primary deployment, hpa, +// scales to zero the canary deployment and returns the pod selector label +func (c *Deployer) Initialize(cd *flaggerv1.Canary) (string, error) { primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) - if err := c.createPrimaryDeployment(cd); err != nil { - return fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err) + label, err := c.createPrimaryDeployment(cd) + if err != nil { + return "", fmt.Errorf("creating deployment %s.%s failed: %v", primaryName, cd.Namespace, err) } if cd.Status.Phase == "" { c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Scaling down %s.%s", cd.Spec.TargetRef.Name, cd.Namespace) if err := c.Scale(cd, 0); err != nil { - return err + return "", err } } if cd.Spec.AutoscalerRef != nil && cd.Spec.AutoscalerRef.Kind == "HorizontalPodAutoscaler" { if err := c.createPrimaryHpa(cd); err != nil { - return fmt.Errorf("creating hpa %s.%s failed: %v", primaryName, cd.Namespace, err) + return "", fmt.Errorf("creating hpa %s.%s failed: %v", primaryName, cd.Namespace, err) } } - return nil + return label, nil } // Promote copies the pod spec, secrets and config maps from canary to primary @@ -65,6 +67,12 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error { return fmt.Errorf("deployment %s.%s query error %v", targetName, cd.Namespace, err) } + label, err := c.getSelectorLabel(canary) + if err != nil { + return fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'", + targetName, cd.Namespace, targetName) + } + primary, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(primaryName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { @@ -98,7 +106,7 @@ func (c *Deployer) Promote(cd *flaggerv1.Canary) error { } primaryCopy.Spec.Template.Annotations = annotations - primaryCopy.Spec.Template.Labels = makePrimaryLabels(canary.Spec.Template.Labels, primaryName) + primaryCopy.Spec.Template.Labels = makePrimaryLabels(canary.Spec.Template.Labels, primaryName, label) _, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Update(primaryCopy) if err != nil { @@ -164,20 +172,21 @@ func (c *Deployer) Scale(cd *flaggerv1.Canary, replicas int32) error { return nil } -func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) error { +func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) (string, error) { targetName := cd.Spec.TargetRef.Name primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) canaryDep, err := c.KubeClient.AppsV1().Deployments(cd.Namespace).Get(targetName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { - return fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace) + return "", fmt.Errorf("deployment %s.%s not found, retrying", targetName, cd.Namespace) } - return err + return "", err } - if appSel, ok := canaryDep.Spec.Selector.MatchLabels["app"]; !ok || appSel != canaryDep.Name { - return fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'", + label, err := c.getSelectorLabel(canaryDep) + if err != nil { + return "", fmt.Errorf("invalid label selector! Deployment %s.%s spec.selector.matchLabels must contain selector 'app: %s'", targetName, cd.Namespace, targetName) } @@ -186,14 +195,14 @@ func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) error { // create primary secrets and config maps configRefs, err := c.ConfigTracker.GetTargetConfigs(cd) if err != nil { - return err + return "", err } if err := c.ConfigTracker.CreatePrimaryConfigs(cd, configRefs); err != nil { - return err + return "", err } annotations, err := c.makeAnnotations(canaryDep.Spec.Template.Annotations) if err != nil { - return err + return "", err } replicas := int32(1) @@ -223,12 +232,12 @@ func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) error { Strategy: canaryDep.Spec.Strategy, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": primaryName, + label: primaryName, }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ - Labels: makePrimaryLabels(canaryDep.Spec.Template.Labels, primaryName), + Labels: makePrimaryLabels(canaryDep.Spec.Template.Labels, primaryName, label), Annotations: annotations, }, // update spec with the primary secrets and config maps @@ -239,13 +248,13 @@ func (c *Deployer) createPrimaryDeployment(cd *flaggerv1.Canary) error { _, err = c.KubeClient.AppsV1().Deployments(cd.Namespace).Create(primaryDep) if err != nil { - return err + return "", err } c.Logger.With("canary", fmt.Sprintf("%s.%s", cd.Name, cd.Namespace)).Infof("Deployment %s.%s created", primaryDep.GetName(), cd.Namespace) } - return nil + return label, nil } func (c *Deployer) createPrimaryHpa(cd *flaggerv1.Canary) error { @@ -320,15 +329,25 @@ func (c *Deployer) makeAnnotations(annotations map[string]string) (map[string]st return res, nil } -func makePrimaryLabels(labels map[string]string, primaryName string) map[string]string { - idKey := "app" +// getSelectorLabel returns the selector match label +func (c *Deployer) getSelectorLabel(deployment *appsv1.Deployment) (string, error) { + for _, l := range c.Labels { + if _, ok := deployment.Spec.Selector.MatchLabels[l]; ok { + return l, nil + } + } + + return "", fmt.Errorf("selector not found") +} + +func makePrimaryLabels(labels map[string]string, primaryName string, label string) map[string]string { res := make(map[string]string) for k, v := range labels { - if k != idKey { + if k != label { res[k] = v } } - res[idKey] = primaryName + res[label] = primaryName return res } diff --git a/pkg/canary/deployer_test.go b/pkg/canary/deployer_test.go index fc53558af..d978c78c8 100644 --- a/pkg/canary/deployer_test.go +++ b/pkg/canary/deployer_test.go @@ -9,7 +9,7 @@ import ( func TestCanaryDeployer_Sync(t *testing.T) { mocks := SetupMocks() - err := mocks.deployer.Initialize(mocks.canary) + _, err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -95,7 +95,7 @@ func TestCanaryDeployer_Sync(t *testing.T) { func TestCanaryDeployer_IsNewSpec(t *testing.T) { mocks := SetupMocks() - err := mocks.deployer.Initialize(mocks.canary) + _, err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -118,7 +118,7 @@ func TestCanaryDeployer_IsNewSpec(t *testing.T) { func TestCanaryDeployer_Promote(t *testing.T) { mocks := SetupMocks() - err := mocks.deployer.Initialize(mocks.canary) + _, err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -163,7 +163,7 @@ func TestCanaryDeployer_Promote(t *testing.T) { func TestCanaryDeployer_IsReady(t *testing.T) { mocks := SetupMocks() - err := mocks.deployer.Initialize(mocks.canary) + _, err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Error("Expected primary readiness check to fail") } @@ -181,7 +181,7 @@ func TestCanaryDeployer_IsReady(t *testing.T) { func TestCanaryDeployer_SetFailedChecks(t *testing.T) { mocks := SetupMocks() - err := mocks.deployer.Initialize(mocks.canary) + _, err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -203,7 +203,7 @@ func TestCanaryDeployer_SetFailedChecks(t *testing.T) { func TestCanaryDeployer_SetState(t *testing.T) { mocks := SetupMocks() - err := mocks.deployer.Initialize(mocks.canary) + _, err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -225,7 +225,7 @@ func TestCanaryDeployer_SetState(t *testing.T) { func TestCanaryDeployer_SyncStatus(t *testing.T) { mocks := SetupMocks() - err := mocks.deployer.Initialize(mocks.canary) + _, err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } @@ -264,7 +264,7 @@ func TestCanaryDeployer_SyncStatus(t *testing.T) { func TestCanaryDeployer_Scale(t *testing.T) { mocks := SetupMocks() - err := mocks.deployer.Initialize(mocks.canary) + _, err := mocks.deployer.Initialize(mocks.canary) if err != nil { t.Fatal(err.Error()) } diff --git a/pkg/canary/mock.go b/pkg/canary/mock.go index 17a434aff..f60444e89 100644 --- a/pkg/canary/mock.go +++ b/pkg/canary/mock.go @@ -46,6 +46,7 @@ func SetupMocks() Mocks { FlaggerClient: flaggerClient, KubeClient: kubeClient, Logger: logger, + Labels: []string{"app", "name"}, ConfigTracker: ConfigTracker{ Logger: logger, KubeClient: kubeClient, @@ -222,13 +223,13 @@ func newTestDeployment() *appsv1.Deployment { Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "podinfo", + "name": "podinfo", }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": "podinfo", + "name": "podinfo", }, }, Spec: corev1.PodSpec{ @@ -341,13 +342,13 @@ func newTestDeploymentV2() *appsv1.Deployment { Spec: appsv1.DeploymentSpec{ Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ - "app": "podinfo", + "name": "podinfo", }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{ - "app": "podinfo", + "name": "podinfo", }, }, Spec: corev1.PodSpec{ diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 093884313..45c3d938d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -60,6 +60,7 @@ func NewController( notifier *notifier.Slack, meshProvider string, version string, + labels []string, ) *Controller { logger.Debug("Creating event broadcaster") flaggerscheme.AddToScheme(scheme.Scheme) @@ -75,6 +76,7 @@ func NewController( Logger: logger, KubeClient: kubeClient, FlaggerClient: flaggerClient, + Labels: labels, ConfigTracker: canary.ConfigTracker{ Logger: logger, KubeClient: kubeClient, diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 2954d1253..8946773ad 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -69,6 +69,7 @@ func SetupMocks(abtest bool) Mocks { Logger: logger, KubeClient: kubeClient, FlaggerClient: flaggerClient, + Labels: []string{"app", "name"}, ConfigTracker: canary.ConfigTracker{ Logger: logger, KubeClient: kubeClient, diff --git a/pkg/controller/scheduler.go b/pkg/controller/scheduler.go index b2602bab3..85cdb59f6 100644 --- a/pkg/controller/scheduler.go +++ b/pkg/controller/scheduler.go @@ -90,7 +90,8 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh primaryName := fmt.Sprintf("%s-primary", cd.Spec.TargetRef.Name) // create primary deployment and hpa if needed - if err := c.deployer.Initialize(cd); err != nil { + label, err := c.deployer.Initialize(cd) + if err != nil { c.recordEventWarningf(cd, "%v", err) return } @@ -100,7 +101,7 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh meshRouter := routerFactory.MeshRouter(c.meshProvider) // create or update ClusterIP services - if err := routerFactory.KubernetesRouter().Reconcile(cd); err != nil { + if err := routerFactory.KubernetesRouter(label).Reconcile(cd); err != nil { c.recordEventWarningf(cd, "%v", err) return } diff --git a/pkg/router/factory.go b/pkg/router/factory.go index 4a8793927..31f5d6089 100644 --- a/pkg/router/factory.go +++ b/pkg/router/factory.go @@ -26,11 +26,12 @@ func NewFactory(kubeClient kubernetes.Interface, } // KubernetesRouter returns a ClusterIP service router -func (factory *Factory) KubernetesRouter() *KubernetesRouter { +func (factory *Factory) KubernetesRouter(label string) *KubernetesRouter { return &KubernetesRouter{ logger: factory.logger, flaggerClient: factory.flaggerClient, kubeClient: factory.kubeClient, + label: label, } } diff --git a/pkg/router/kubernetes.go b/pkg/router/kubernetes.go index 38d6165bd..2d38dfe6f 100644 --- a/pkg/router/kubernetes.go +++ b/pkg/router/kubernetes.go @@ -19,6 +19,7 @@ type KubernetesRouter struct { kubeClient kubernetes.Interface flaggerClient clientset.Interface logger *zap.SugaredLogger + label string } // Reconcile creates or updates the primary and canary services @@ -64,7 +65,7 @@ func (c *KubernetesRouter) reconcileService(canary *flaggerv1.Canary, name strin svcSpec := corev1.ServiceSpec{ Type: corev1.ServiceTypeClusterIP, - Selector: map[string]string{"app": target}, + Selector: map[string]string{c.label: target}, Ports: []corev1.ServicePort{ { Name: portName, From 65f716182bb8aff68df3d995bdfb066ab67254d3 Mon Sep 17 00:00:00 2001 From: stefanprodan Date: Mon, 15 Apr 2019 13:30:58 +0300 Subject: [PATCH 4/4] Add default selectors to docs --- docs/gitbook/how-it-works.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/gitbook/how-it-works.md b/docs/gitbook/how-it-works.md index 598e299aa..2b325e506 100644 --- a/docs/gitbook/how-it-works.md +++ b/docs/gitbook/how-it-works.md @@ -96,6 +96,9 @@ spec: app: podinfo ``` +Besides `app` Flagger supports `name` and `app.kubernetes.io/name` selectors. If you use a different +convention you can specify your label with the `-selector-labels` flag. + The target deployment should expose a TCP port that will be used by Flagger to create the ClusterIP Service and the Istio Virtual Service. The container port from the target deployment should match the `service.port` value.