diff --git a/api/v1beta1/rollout_types.go b/api/v1beta1/rollout_types.go index 7b8ff331..c0b862e7 100644 --- a/api/v1beta1/rollout_types.go +++ b/api/v1beta1/rollout_types.go @@ -17,6 +17,9 @@ limitations under the License. package v1beta1 import ( + "reflect" + + apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" @@ -92,6 +95,25 @@ func (r *RolloutStrategy) GetRollingStyle() RollingStyleType { return PartitionRollingStyle } +// using single field EnableExtraWorkloadForCanary to distinguish partition-style from canary-style +// is not enough, for example, a v1alaph1 Rollout can be converted to v1beta1 Rollout +// with EnableExtraWorkloadForCanary set as true, even the objectRef is cloneset (which doesn't support canary release) +func IsRealPartition(rollout *Rollout) bool { + if rollout.Spec.Strategy.IsEmptyRelease() { + return false + } + estimation := rollout.Spec.Strategy.GetRollingStyle() + if estimation == BlueGreenRollingStyle { + return false + } + targetRef := rollout.Spec.WorkloadRef + if targetRef.APIVersion == apps.SchemeGroupVersion.String() && targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() && + estimation == CanaryRollingStyle { + return false + } + return true +} + // r.GetRollingStyle() == BlueGreenRollingStyle func (r *RolloutStrategy) IsBlueGreenRelease() bool { return r.GetRollingStyle() == BlueGreenRollingStyle diff --git a/pkg/controller/rollout/rollout_canary.go b/pkg/controller/rollout/rollout_canary.go index c814a750..11ad6000 100644 --- a/pkg/controller/rollout/rollout_canary.go +++ b/pkg/controller/rollout/rollout_canary.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -75,6 +76,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { klog.Infof("rollout(%s/%s) canary step jumped", c.Rollout.Namespace, c.Rollout.Name) return nil } + gracePeriodSeconds := util.GracePeriodSecondsOrDefault(c.Rollout.Spec.Strategy.GetTrafficRouting(), defaultGracePeriodSeconds) // When the first batch is trafficRouting rolling and the next steps are rolling release, // We need to clean up the canary-related resources first and then rollout the rest of the batch. currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1] @@ -86,7 +88,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { return err } else if !done { klog.Infof("rollout(%s/%s) cleaning up canary-related resources", c.Rollout.Namespace, c.Rollout.Name) - expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second) + expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second) c.RecheckTime = &expectedTime return nil } @@ -94,10 +96,68 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { switch canaryStatus.CurrentStepState { // before CanaryStepStateUpgrade, handle some special cases, to prevent traffic loss case v1beta1.CanaryStepStateInit: - // placeholder for the later traffic modification Pull Request - canaryStatus.NextStepIndex = util.NextBatchIndex(c.Rollout, canaryStatus.CurrentStepIndex) + klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateInit) + tr := newTrafficRoutingContext(c) + if currentStep.Traffic == nil && len(currentStep.Matches) == 0 { + canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade + klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name, + canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState) + return nil + } + + /* + The following check serves to bypass the bug in ingress-nginx controller https://github.com/kubernetes/ingress-nginx/issues/9635 + For partition-style: if the expected replicas of the current rollout step is not less than workload.spec.replicas, + it indicates that this step will release all stable pods to new version, ie. there will be no stable pods, which will + trigger the bug. + To avoid this issue, we restore stable Service before scaling the stable pods down to zero. + This ensures that the backends behind the stable ingress remain active, preventing the bug from being triggered. + */ + expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true) + if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) { + klog.Infof("special case detected: rollout(%s/%s) restore stable Service", c.Rollout.Namespace, c.Rollout.Name) + done, err := m.trafficRoutingManager.RestoreStableService(tr) + if err != nil { + return err + } else if !done { + expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second) + c.RecheckTime = &expectedTime + return nil + } + } + + /* + The following check is used to solve scenario like this: + steps: + - replicas: 1 # frist batch + matches: + - headers: + - name: user-agent + type: Exact + value: pc + in the first batch, pods with new version will be created in step CanaryStepStateUpgrade, once ready, + they will serve as active backends behind the stable service, because the stable service hasn't been + modified by rollout (ie. it selects pods of all versions). + Thus, requests with or without the header (user-agent: pc) will be routed to pods of all versions evenly, before + we arrive the CanaryStepStateTrafficRouting step. + To avoid this issue, we patch selector to stable Service before CanaryStepStateUpgrade step. + */ + if canaryStatus.CurrentStepIndex == 1 { + klog.Infof("special case detected: rollout(%s/%s) patch stable Service", c.Rollout.Namespace, c.Rollout.Name) + done, err := m.trafficRoutingManager.PatchStableService(tr) + if err != nil { + return err + } else if !done { + expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second) + c.RecheckTime = &expectedTime + return nil + } + } + + canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()} canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade - fallthrough + klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name, + canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState) case v1beta1.CanaryStepStateUpgrade: klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateUpgrade) @@ -106,6 +166,12 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { return err } else if done { canaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting + // if it is partition style and the last batch, we can skip the CanaryStepStateTrafficRouting step + // to bypass the bug mentioned above + expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true) + if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) { + canaryStatus.CurrentStepState = v1beta1.CanaryStepStateMetricsAnalysis + } canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()} klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState) @@ -124,7 +190,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateTrafficRouting, canaryStatus.CurrentStepState) } - expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second) + expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second) c.RecheckTime = &expectedTime case v1beta1.CanaryStepStateMetricsAnalysis: @@ -217,6 +283,12 @@ func (m *canaryReleaseManager) doCanaryPaused(c *RolloutContext) (bool, error) { canaryStatus := c.NewStatus.CanaryStatus currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1] steps := len(c.Rollout.Spec.Strategy.Canary.Steps) + // If it is the last step, and 100% of pods, then return true + if int32(steps) == canaryStatus.CurrentStepIndex { + if currentStep.Replicas != nil && currentStep.Replicas.StrVal == "100%" { + return true, nil + } + } cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing) // need manual confirmation if currentStep.Pause.Duration == nil { diff --git a/pkg/controller/rollout/rollout_canary_test.go b/pkg/controller/rollout/rollout_canary_test.go index aebcfeef..cbefcdc1 100644 --- a/pkg/controller/rollout/rollout_canary_test.go +++ b/pkg/controller/rollout/rollout_canary_test.go @@ -63,6 +63,7 @@ func TestRunCanary(t *testing.T) { obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1" obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547" obj.Status.CanaryStatus.CurrentStepIndex = 1 + obj.Status.CanaryStatus.NextStepIndex = 2 obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling @@ -76,6 +77,7 @@ func TestRunCanary(t *testing.T) { s.CanaryStatus.StableRevision = "pod-template-hash-v1" s.CanaryStatus.CanaryRevision = "6f8cc56547" s.CanaryStatus.CurrentStepIndex = 1 + s.CanaryStatus.NextStepIndex = 2 s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling @@ -139,6 +141,7 @@ func TestRunCanary(t *testing.T) { obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1" obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547" obj.Status.CanaryStatus.CurrentStepIndex = 1 + obj.Status.CanaryStatus.NextStepIndex = 2 obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling @@ -185,6 +188,7 @@ func TestRunCanary(t *testing.T) { s.CanaryStatus.CanaryReplicas = 1 s.CanaryStatus.CanaryReadyReplicas = 1 s.CanaryStatus.CurrentStepIndex = 1 + s.CanaryStatus.NextStepIndex = 2 s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling @@ -290,6 +294,7 @@ func TestRunCanaryPaused(t *testing.T) { obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1" obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547" obj.Status.CanaryStatus.CurrentStepIndex = 3 + obj.Status.CanaryStatus.NextStepIndex = 4 obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2" obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused return obj @@ -301,6 +306,7 @@ func TestRunCanaryPaused(t *testing.T) { obj.CanaryStatus.StableRevision = "pod-template-hash-v1" obj.CanaryStatus.CanaryRevision = "6f8cc56547" obj.CanaryStatus.CurrentStepIndex = 3 + obj.CanaryStatus.NextStepIndex = 4 obj.CanaryStatus.PodTemplateHash = "pod-template-hash-v2" obj.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused return obj diff --git a/pkg/trafficrouting/manager.go b/pkg/trafficrouting/manager.go index 0907f732..5b34b2bb 100644 --- a/pkg/trafficrouting/manager.go +++ b/pkg/trafficrouting/manager.go @@ -126,7 +126,7 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { if c.LastUpdateTime != nil { // wait seconds for network providers to consume the modification about workload, service and so on. if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) { - klog.Infof("%s update workload or service selector, and wait 3 seconds", c.Key) + klog.Infof("%s update workload or service selector, and wait %d seconds", c.Key, trafficRouting.GracePeriodSeconds) return false, nil } } @@ -139,6 +139,7 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { klog.Warningf("%s stableRevision or podTemplateHash can not be empty, and wait a moment", c.Key) return false, nil } + serviceModified := false // fetch canary service err = m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: canaryServiceName}, canaryService) if err != nil && !errors.IsNotFound(err) { @@ -149,21 +150,19 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { if err != nil { return false, err } - } - - serviceModified := false - // patch canary service to only select the canary pods - if canaryService.Spec.Selector[c.RevisionLabelKey] != c.CanaryRevision { + serviceModified = true + } else if canaryService.Spec.Selector[c.RevisionLabelKey] != c.CanaryRevision { + // patch canary service to only select the canary pods body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.CanaryRevision) if err = m.Patch(context.TODO(), canaryService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { klog.Errorf("%s patch canary service(%s) selector failed: %s", c.Key, canaryService.Name, err.Error()) return false, err } - serviceModified = true // update canary service time, and wait 3 seconds, just to be safe c.LastUpdateTime = &metav1.Time{Time: time.Now()} klog.Infof("%s patch canary service(%s) selector(%s=%s) success", c.Key, canaryService.Name, c.RevisionLabelKey, c.CanaryRevision) + serviceModified = true } // patch stable service to only select the stable pods if stableService.Spec.Selector[c.RevisionLabelKey] != c.StableRevision { @@ -181,6 +180,13 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { if serviceModified { return false, nil } + } else if c.DisableGenerateCanaryService { + // if DisableGenerateCanaryService is on, selector is not needed, we should remove it + // it's necessary because selector probably has been patched in CanaryStepStateInit step within runCanary function + verify, err := m.restoreStableService(c) + if err != nil || !verify { + return false, err + } } // new network provider, ingress or gateway @@ -217,9 +223,16 @@ func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestore } cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}} + // In rollout failure case, no canary-service will be created, but stable service may be patched + // if and only if both of following conditions are met, we can just return + // 1. canary service has been already cleaned up + // 2. stable service has no selector + stableService := &corev1.Service{} + err = m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService) + stableServiceRestored := errors.IsNotFound(err) || (err == nil && stableService.Spec.Selector[c.RevisionLabelKey] == "") // if canary svc has been already cleaned up, just return // even DisableGenerateCanaryService is true, canary svc still exists, because canary service is stable service - if err = m.Get(context.TODO(), client.ObjectKeyFromObject(cService), cService); err != nil { + if err = m.Get(context.TODO(), client.ObjectKeyFromObject(cService), cService); err != nil && stableServiceRestored { if !errors.IsNotFound(err) { klog.Errorf("%s get canary service(%s) failed: %s", c.Key, cServiceName, err.Error()) return false, err @@ -377,9 +390,6 @@ func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) { return true, nil } trafficRouting := c.ObjectRef[0] - if trafficRouting.GracePeriodSeconds <= 0 { - trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds - } //fetch stable service stableService := &corev1.Service{} err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService) diff --git a/pkg/trafficrouting/manager_test.go b/pkg/trafficrouting/manager_test.go index 57546251..f8f6bf06 100644 --- a/pkg/trafficrouting/manager_test.go +++ b/pkg/trafficrouting/manager_test.go @@ -775,6 +775,7 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { getRollout: func() (*v1beta1.Rollout, *util.Workload) { obj := demoIstioRollout.DeepCopy() obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)} + obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1 return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, expectUnstructureds: func() []*unstructured.Unstructured { @@ -804,7 +805,6 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { objects = append(objects, u) return objects }, - // Rollout(/rollout-demo) is doing trafficRouting({"traffic":"5%"}), and wait a moment expectDone: true, }, { @@ -834,6 +834,7 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { obj := demoIstioRollout.DeepCopy() // set DisableGenerateCanaryService as true obj.Spec.Strategy.Canary.DisableGenerateCanaryService = true + obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1 obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, @@ -864,7 +865,6 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { objects = append(objects, u) return objects }, - // Rollout(/rollout-demo) is doing trafficRouting({"traffic":"5%"}), and wait a moment expectDone: true, }, } @@ -898,11 +898,22 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { if err != nil { t.Fatalf("InitializeTrafficRouting failed: %s", err) } + // now we need to wait at least 2x grace time to keep traffic stable: + // create the canary service -> grace time -> update the gateway -> grace time + // therefore, before both grace times are over, DoTrafficRouting should return false + // firstly, create the canary Service, before the grace time over, return false _, err = manager.DoTrafficRouting(c) if err != nil { t.Fatalf("DoTrafficRouting failed: %s", err) } - // may return false due to in the course of doing trafficRouting, let's do it again + time.Sleep(1 * time.Second) + // secondly, update the gateway, before the grace time over, return false + _, err = manager.DoTrafficRouting(c) + if err != nil { + t.Fatalf("DoTrafficRouting failed: %s", err) + } + time.Sleep(1 * time.Second) + // now, both grace times are over, it should be true done, err := manager.DoTrafficRouting(c) if err != nil { t.Fatalf("DoTrafficRouting failed: %s", err) @@ -986,6 +997,7 @@ func TestFinalisingTrafficRouting(t *testing.T) { obj := demoRollout.DeepCopy() obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted obj.Status.CanaryStatus.CurrentStepIndex = 4 + obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(time.Hour)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, onlyRestoreStableService: true, diff --git a/pkg/util/rollout_utils.go b/pkg/util/rollout_utils.go index 33a2b0d9..a1e67a89 100644 --- a/pkg/util/rollout_utils.go +++ b/pkg/util/rollout_utils.go @@ -190,3 +190,10 @@ func CheckNextBatchIndexWithCorrect(rollout *rolloutv1beta1.Rollout) { } } } + +func GracePeriodSecondsOrDefault(refs []rolloutv1beta1.TrafficRoutingRef, defaultSeconds int32) int32 { + if len(refs) == 0 { + return defaultSeconds + } + return refs[0].GracePeriodSeconds +}