Skip to content

Commit

Permalink
update runCanary traffic step for special cases
Browse files Browse the repository at this point in the history
Signed-off-by: yunbo <[email protected]>
  • Loading branch information
Funinu committed Jul 19, 2024
1 parent e7652cb commit 2f53045
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 19 deletions.
22 changes: 22 additions & 0 deletions api/v1beta1/rollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.
package v1beta1

import (
"reflect"

apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -92,6 +95,25 @@ func (r *RolloutStrategy) GetRollingStyle() RollingStyleType {
return PartitionRollingStyle
}

// using single field EnableExtraWorkloadForCanary to distinguish partition-style from canary-style
// is not enough, for example, a v1alaph1 Rollout can be converted to v1beta1 Rollout
// with EnableExtraWorkloadForCanary set as true, even the objectRef is cloneset (which doesn't support canary release)
func IsRealPartition(rollout *Rollout) bool {
if rollout.Spec.Strategy.IsEmptyRelease() {
return false
}
estimation := rollout.Spec.Strategy.GetRollingStyle()
if estimation == BlueGreenRollingStyle {
return false
}
targetRef := rollout.Spec.WorkloadRef
if targetRef.APIVersion == apps.SchemeGroupVersion.String() && targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() &&
estimation == CanaryRollingStyle {
return false
}
return true
}

// r.GetRollingStyle() == BlueGreenRollingStyle
func (r *RolloutStrategy) IsBlueGreenRelease() bool {
return r.GetRollingStyle() == BlueGreenRollingStyle
Expand Down
82 changes: 77 additions & 5 deletions pkg/controller/rollout/rollout_canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -75,6 +76,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
klog.Infof("rollout(%s/%s) canary step jumped", c.Rollout.Namespace, c.Rollout.Name)
return nil
}
gracePeriodSeconds := util.GracePeriodSecondsOrDefault(c.Rollout.Spec.Strategy.GetTrafficRouting(), defaultGracePeriodSeconds)
// When the first batch is trafficRouting rolling and the next steps are rolling release,
// We need to clean up the canary-related resources first and then rollout the rest of the batch.
currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
Expand All @@ -86,18 +88,76 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
return err
} else if !done {
klog.Infof("rollout(%s/%s) cleaning up canary-related resources", c.Rollout.Namespace, c.Rollout.Name)
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}
switch canaryStatus.CurrentStepState {
// before CanaryStepStateUpgrade, handle some special cases, to prevent traffic loss
case v1beta1.CanaryStepStateInit:
// placeholder for the later traffic modification Pull Request
canaryStatus.NextStepIndex = util.NextBatchIndex(c.Rollout, canaryStatus.CurrentStepIndex)
klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateInit)
tr := newTrafficRoutingContext(c)
if currentStep.Traffic == nil && len(currentStep.Matches) == 0 {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState)
return nil
}

/*
The following check serves to bypass the bug in ingress-nginx controller https://github.com/kubernetes/ingress-nginx/issues/9635
For partition-style: if the expected replicas of the current rollout step is not less than workload.spec.replicas,
it indicates that this step will release all stable pods to new version, ie. there will be no stable pods, which will
trigger the bug.
To avoid this issue, we restore stable Service before scaling the stable pods down to zero.
This ensures that the backends behind the stable ingress remain active, preventing the bug from being triggered.
*/
expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true)
if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) {
klog.Infof("special case detected: rollout(%s/%s) restore stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.RestoreStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}

/*
The following check is used to solve scenario like this:
steps:
- replicas: 1 # frist batch
matches:
- headers:
- name: user-agent
type: Exact
value: pc
in the first batch, pods with new version will be created in step CanaryStepStateUpgrade, once ready,
they will serve as active backends behind the stable service, because the stable service hasn't been
modified by rollout (ie. it selects pods of all versions).
Thus, requests with or without the header (user-agent: pc) will be routed to pods of all versions evenly, before
we arrive the CanaryStepStateTrafficRouting step.
To avoid this issue, we patch selector to stable Service before CanaryStepStateUpgrade step.
*/
if canaryStatus.CurrentStepIndex == 1 {
klog.Infof("special case detected: rollout(%s/%s) patch stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.PatchStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}

canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
fallthrough
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState)

case v1beta1.CanaryStepStateUpgrade:
klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateUpgrade)
Expand All @@ -106,6 +166,12 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
return err
} else if done {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting
// if it is partition style and the last batch, we can skip the CanaryStepStateTrafficRouting step
// to bypass the bug mentioned above
expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true)
if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateMetricsAnalysis
}
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState)
Expand All @@ -124,7 +190,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateTrafficRouting, canaryStatus.CurrentStepState)
}
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime

case v1beta1.CanaryStepStateMetricsAnalysis:
Expand Down Expand Up @@ -217,6 +283,12 @@ func (m *canaryReleaseManager) doCanaryPaused(c *RolloutContext) (bool, error) {
canaryStatus := c.NewStatus.CanaryStatus
currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
steps := len(c.Rollout.Spec.Strategy.Canary.Steps)
// If it is the last step, and 100% of pods, then return true
if int32(steps) == canaryStatus.CurrentStepIndex {
if currentStep.Replicas != nil && currentStep.Replicas.StrVal == "100%" {
return true, nil
}
}
cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing)
// need manual confirmation
if currentStep.Pause.Duration == nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/rollout/rollout_canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestRunCanary(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 1
obj.Status.CanaryStatus.NextStepIndex = 2
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand All @@ -76,6 +77,7 @@ func TestRunCanary(t *testing.T) {
s.CanaryStatus.StableRevision = "pod-template-hash-v1"
s.CanaryStatus.CanaryRevision = "6f8cc56547"
s.CanaryStatus.CurrentStepIndex = 1
s.CanaryStatus.NextStepIndex = 2
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -139,6 +141,7 @@ func TestRunCanary(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 1
obj.Status.CanaryStatus.NextStepIndex = 2
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -185,6 +188,7 @@ func TestRunCanary(t *testing.T) {
s.CanaryStatus.CanaryReplicas = 1
s.CanaryStatus.CanaryReadyReplicas = 1
s.CanaryStatus.CurrentStepIndex = 1
s.CanaryStatus.NextStepIndex = 2
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -290,6 +294,7 @@ func TestRunCanaryPaused(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 3
obj.Status.CanaryStatus.NextStepIndex = 4
obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused
return obj
Expand All @@ -301,6 +306,7 @@ func TestRunCanaryPaused(t *testing.T) {
obj.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.CanaryStatus.CurrentStepIndex = 3
obj.CanaryStatus.NextStepIndex = 4
obj.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused
return obj
Expand Down
32 changes: 21 additions & 11 deletions pkg/trafficrouting/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if c.LastUpdateTime != nil {
// wait seconds for network providers to consume the modification about workload, service and so on.
if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) {
klog.Infof("%s update workload or service selector, and wait 3 seconds", c.Key)
klog.Infof("%s update workload or service selector, and wait %d seconds", c.Key, trafficRouting.GracePeriodSeconds)
return false, nil
}
}
Expand All @@ -139,6 +139,7 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
klog.Warningf("%s stableRevision or podTemplateHash can not be empty, and wait a moment", c.Key)
return false, nil
}
serviceModified := false
// fetch canary service
err = m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: canaryServiceName}, canaryService)
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -149,21 +150,19 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if err != nil {
return false, err
}
}

serviceModified := false
// patch canary service to only select the canary pods
if canaryService.Spec.Selector[c.RevisionLabelKey] != c.CanaryRevision {
serviceModified = true
} else if canaryService.Spec.Selector[c.RevisionLabelKey] != c.CanaryRevision {
// patch canary service to only select the canary pods
body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.CanaryRevision)
if err = m.Patch(context.TODO(), canaryService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch canary service(%s) selector failed: %s", c.Key, canaryService.Name, err.Error())
return false, err
}
serviceModified = true
// update canary service time, and wait 3 seconds, just to be safe
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("%s patch canary service(%s) selector(%s=%s) success",
c.Key, canaryService.Name, c.RevisionLabelKey, c.CanaryRevision)
serviceModified = true
}
// patch stable service to only select the stable pods
if stableService.Spec.Selector[c.RevisionLabelKey] != c.StableRevision {
Expand All @@ -181,6 +180,13 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if serviceModified {
return false, nil
}
} else if c.DisableGenerateCanaryService {
// if DisableGenerateCanaryService is on, selector is not needed, we should remove it
// it's necessary because selector probably has been patched in CanaryStepStateInit step within runCanary function
verify, err := m.restoreStableService(c)
if err != nil || !verify {
return false, err
}
}

// new network provider, ingress or gateway
Expand Down Expand Up @@ -217,9 +223,16 @@ func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestore
}

cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
// In rollout failure case, no canary-service will be created, but stable service may be patched
// if and only if both of following conditions are met, we can just return
// 1. canary service has been already cleaned up
// 2. stable service has no selector
stableService := &corev1.Service{}
err = m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
stableServiceRestored := errors.IsNotFound(err) || (err == nil && stableService.Spec.Selector[c.RevisionLabelKey] == "")
// if canary svc has been already cleaned up, just return
// even DisableGenerateCanaryService is true, canary svc still exists, because canary service is stable service
if err = m.Get(context.TODO(), client.ObjectKeyFromObject(cService), cService); err != nil {
if err = m.Get(context.TODO(), client.ObjectKeyFromObject(cService), cService); err != nil && stableServiceRestored {
if !errors.IsNotFound(err) {
klog.Errorf("%s get canary service(%s) failed: %s", c.Key, cServiceName, err.Error())
return false, err
Expand Down Expand Up @@ -377,9 +390,6 @@ func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
Expand Down
18 changes: 15 additions & 3 deletions pkg/trafficrouting/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoIstioRollout.DeepCopy()
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)}
obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
expectUnstructureds: func() []*unstructured.Unstructured {
Expand Down Expand Up @@ -804,7 +805,6 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
objects = append(objects, u)
return objects
},
// Rollout(/rollout-demo) is doing trafficRouting({"traffic":"5%"}), and wait a moment
expectDone: true,
},
{
Expand Down Expand Up @@ -834,6 +834,7 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
obj := demoIstioRollout.DeepCopy()
// set DisableGenerateCanaryService as true
obj.Spec.Strategy.Canary.DisableGenerateCanaryService = true
obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
Expand Down Expand Up @@ -864,7 +865,6 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
objects = append(objects, u)
return objects
},
// Rollout(/rollout-demo) is doing trafficRouting({"traffic":"5%"}), and wait a moment
expectDone: true,
},
}
Expand Down Expand Up @@ -898,11 +898,22 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
if err != nil {
t.Fatalf("InitializeTrafficRouting failed: %s", err)
}
// now we need to wait at least 2x grace time to keep traffic stable:
// create the canary service -> grace time -> update the gateway -> grace time
// therefore, before both grace times are over, DoTrafficRouting should return false
// firstly, create the canary Service, before the grace time over, return false
_, err = manager.DoTrafficRouting(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
}
// may return false due to in the course of doing trafficRouting, let's do it again
time.Sleep(1 * time.Second)
// secondly, update the gateway, before the grace time over, return false
_, err = manager.DoTrafficRouting(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
}
time.Sleep(1 * time.Second)
// now, both grace times are over, it should be true
done, err := manager.DoTrafficRouting(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
Expand Down Expand Up @@ -986,6 +997,7 @@ func TestFinalisingTrafficRouting(t *testing.T) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(time.Hour)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyRestoreStableService: true,
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/rollout_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,10 @@ func CheckNextBatchIndexWithCorrect(rollout *rolloutv1beta1.Rollout) {
}
}
}

func GracePeriodSecondsOrDefault(refs []rolloutv1beta1.TrafficRoutingRef, defaultSeconds int32) int32 {
if len(refs) == 0 {
return defaultSeconds
}
return refs[0].GracePeriodSeconds
}

0 comments on commit 2f53045

Please sign in to comment.