Skip to content

Commit

Permalink
traffic: do traffic routing improve
Browse files Browse the repository at this point in the history
Signed-off-by: yunbo <[email protected]>

traffic: init traffic check int-typed replicas

Signed-off-by: yunbo <[email protected]>

others: add traffic setting check for partition step to webhook

Signed-off-by: yunbo <[email protected]>
  • Loading branch information
Funinu committed Jul 12, 2024
1 parent 7633254 commit caca0f3
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 9 deletions.
30 changes: 30 additions & 0 deletions api/v1beta1/rollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.
package v1beta1

import (
"reflect"

apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -41,6 +44,14 @@ const (
// RollbackInBatchAnnotation is set to rollout annotations.
// RollbackInBatchAnnotation allow use disable quick rollback, and will roll back in batch style.
RollbackInBatchAnnotation = "rollouts.kruise.io/rollback-in-batch"

// PartitionReplicasLimitWithTraffic is set to rollout annotations.
// PartitionReplicasLimitWithTraffic represents the maximum percentage of replicas
// allowed for a step of partition-style release, with traffic/matches specified.
// If a step is configured with a number of replicas exceeding this percentage, the traffic strategy for that step
// must not be specified. If this rule is violated, the Rollout webhook will block the creation or modification of the Rollout.
// The default limit is set to 30%.
PartitionReplicasLimitWithTraffic = "rollouts.kruise.io/partition-replicas-limit"
)

// RolloutSpec defines the desired state of Rollout
Expand Down Expand Up @@ -92,6 +103,25 @@ func (r *RolloutStrategy) GetRollingStyle() RollingStyleType {
return PartitionRollingStyle
}

// using single field EnableExtraWorkloadForCanary to distinguish partition-style from canary-style
// is not enough, for example, a v1alaph1 Rollout can be converted to v1beta1 Rollout
// with EnableExtraWorkloadForCanary set as true, even the objectRef is cloneset (which doesn't support canary release)
func IsRealPartition(rollout *Rollout) bool {
if rollout.Spec.Strategy.IsEmptyRelease() {
return false
}
estimation := rollout.Spec.Strategy.GetRollingStyle()
if estimation == BlueGreenRollingStyle {
return false
}
targetRef := rollout.Spec.WorkloadRef
if targetRef.APIVersion == apps.SchemeGroupVersion.String() && targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() &&
estimation == CanaryRollingStyle {
return false
}
return true
}

// r.GetRollingStyle() == BlueGreenRollingStyle
func (r *RolloutStrategy) IsBlueGreenRelease() bool {
return r.GetRollingStyle() == BlueGreenRollingStyle
Expand Down
70 changes: 67 additions & 3 deletions pkg/controller/rollout/rollout_canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -94,10 +95,61 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
switch canaryStatus.CurrentStepState {
// before CanaryStepStateUpgrade, handle some special cases, to prevent traffic loss
case v1beta1.CanaryStepStateInit:
// placeholder for the later traffic modification Pull Request
canaryStatus.NextStepIndex = util.NextBatchIndex(c.Rollout, canaryStatus.CurrentStepIndex)
klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateInit)
tr := newTrafficRoutingContext(c)
if currentStep.Traffic == nil && len(currentStep.Matches) == 0 {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState)
return nil
}

/*
The following check is used to bypass the bug in ingress-nginx controller https://github.com/kubernetes/ingress-nginx/issues/9635
for partition release, if expected replicas of currentStep isn't less than the workload sepc.replicas,
we can assume that all traffic should be routed to canary pods
*/
expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true)
if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) {
klog.Infof("special case detected: rollout(%s/%s) restore stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.RestoreStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}

/*
The following check is used to solve scenario like this:
steps:
- replicas: 1 # frist batch
matches:
- headers:
- name: user-agent
type: Exact
value: pc
we should patch selector to stable Service before CanaryStepStateUpgrade when in the first batch
otherwise, some traffic will loss between CanaryStepStateUpgrade and CanaryStepStateTrafficRouting
*/
if canaryStatus.CurrentStepIndex == 1 {
klog.Infof("special case detected: rollout(%s/%s) patch stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.PatchStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}

canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
fallthrough
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState)

case v1beta1.CanaryStepStateUpgrade:
klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateUpgrade)
Expand All @@ -106,6 +158,12 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
return err
} else if done {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting
// if it is partition style and the last batch, we can skip the CanaryStepStateTrafficRouting step
// to bypass the bug mentioned above
expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true)
if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateMetricsAnalysis
}
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState)
Expand Down Expand Up @@ -217,6 +275,12 @@ func (m *canaryReleaseManager) doCanaryPaused(c *RolloutContext) (bool, error) {
canaryStatus := c.NewStatus.CanaryStatus
currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
steps := len(c.Rollout.Spec.Strategy.Canary.Steps)
// If it is the last step, and 100% of pods, then return true
if int32(steps) == canaryStatus.CurrentStepIndex {
if currentStep.Replicas != nil && currentStep.Replicas.StrVal == "100%" {
return true, nil
}
}
cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing)
// need manual confirmation
if currentStep.Pause.Duration == nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/rollout/rollout_canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestRunCanary(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 1
obj.Status.CanaryStatus.NextStepIndex = 2
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand All @@ -76,6 +77,7 @@ func TestRunCanary(t *testing.T) {
s.CanaryStatus.StableRevision = "pod-template-hash-v1"
s.CanaryStatus.CanaryRevision = "6f8cc56547"
s.CanaryStatus.CurrentStepIndex = 1
s.CanaryStatus.NextStepIndex = 2
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -139,6 +141,7 @@ func TestRunCanary(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 1
obj.Status.CanaryStatus.NextStepIndex = 2
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -185,6 +188,7 @@ func TestRunCanary(t *testing.T) {
s.CanaryStatus.CanaryReplicas = 1
s.CanaryStatus.CanaryReadyReplicas = 1
s.CanaryStatus.CurrentStepIndex = 1
s.CanaryStatus.NextStepIndex = 2
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -290,6 +294,7 @@ func TestRunCanaryPaused(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 3
obj.Status.CanaryStatus.NextStepIndex = 4
obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused
return obj
Expand All @@ -301,6 +306,7 @@ func TestRunCanaryPaused(t *testing.T) {
obj.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.CanaryStatus.CurrentStepIndex = 3
obj.CanaryStatus.NextStepIndex = 4
obj.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused
return obj
Expand Down
20 changes: 18 additions & 2 deletions pkg/trafficrouting/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if c.LastUpdateTime != nil {
// wait seconds for network providers to consume the modification about workload, service and so on.
if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) {
klog.Infof("%s update workload or service selector, and wait 3 seconds", c.Key)
klog.Infof("%s update workload or service selector, and wait %d seconds", c.Key, trafficRouting.GracePeriodSeconds)
return false, nil
}
}
Expand All @@ -139,6 +139,15 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
klog.Warningf("%s stableRevision or podTemplateHash can not be empty, and wait a moment", c.Key)
return false, nil
}
/*
Why is the serviceModified flag moved here?
The rationale behind this is that when we create a canary Service, it is already instantiated with the appropriate selector.
If the stable Service also has had selector patched previously, the logic will proceed to the EnsureRoutes function uninterrupted.
This actually means: Creating a new Service and updating the gateway resource occurs within a single reconciliation loop,
which might introduce instability.
Therefore, by moving the serviceModified flag here, we introduce a grace period between these two operations to ensure stability.
*/
serviceModified := false
// fetch canary service
err = m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: canaryServiceName}, canaryService)
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -149,9 +158,9 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if err != nil {
return false, err
}
serviceModified = true
}

serviceModified := false
// patch canary service to only select the canary pods
if canaryService.Spec.Selector[c.RevisionLabelKey] != c.CanaryRevision {
body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.CanaryRevision)
Expand Down Expand Up @@ -181,6 +190,13 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if serviceModified {
return false, nil
}
} else if c.DisableGenerateCanaryService {
// if DisableGenerateCanaryService is on, selector is not needed, we should remove it
// it's necessary because selector probably has been patched in CanaryStepStateInit step within runCanary function
verify, err := m.restoreStableService(c)
if err != nil || !verify {
return false, err
}
}

// new network provider, ingress or gateway
Expand Down
18 changes: 15 additions & 3 deletions pkg/trafficrouting/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
getRollout: func() (*v1beta1.Rollout, *util.Workload) {
obj := demoIstioRollout.DeepCopy()
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)}
obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
expectUnstructureds: func() []*unstructured.Unstructured {
Expand Down Expand Up @@ -804,7 +805,6 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
objects = append(objects, u)
return objects
},
// Rollout(/rollout-demo) is doing trafficRouting({"traffic":"5%"}), and wait a moment
expectDone: true,
},
{
Expand Down Expand Up @@ -834,6 +834,7 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
obj := demoIstioRollout.DeepCopy()
// set DisableGenerateCanaryService as true
obj.Spec.Strategy.Canary.DisableGenerateCanaryService = true
obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
Expand Down Expand Up @@ -864,7 +865,6 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
objects = append(objects, u)
return objects
},
// Rollout(/rollout-demo) is doing trafficRouting({"traffic":"5%"}), and wait a moment
expectDone: true,
},
}
Expand Down Expand Up @@ -898,11 +898,22 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) {
if err != nil {
t.Fatalf("InitializeTrafficRouting failed: %s", err)
}
// now we need to wait at least 2x grace time to keep traffic stable:
// create the canary service -> grace time -> update the gateway -> grace time
// therefore, before both grace times are over, DoTrafficRouting should return false
// firstly, create the canary Service, before the grace time over, return false
_, err = manager.DoTrafficRouting(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
}
// may return false due to in the course of doing trafficRouting, let's do it again
time.Sleep(1 * time.Second)
// secondly, update the gateway, before the grace time over, return false
_, err = manager.DoTrafficRouting(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
}
time.Sleep(1 * time.Second)
// now, both grace times are over, it should be true
done, err := manager.DoTrafficRouting(c)
if err != nil {
t.Fatalf("DoTrafficRouting failed: %s", err)
Expand Down Expand Up @@ -986,6 +997,7 @@ func TestFinalisingTrafficRouting(t *testing.T) {
obj := demoRollout.DeepCopy()
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted
obj.Status.CanaryStatus.CurrentStepIndex = 4
obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(time.Hour)}
return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey}
},
onlyRestoreStableService: true,
Expand Down
30 changes: 29 additions & 1 deletion pkg/webhook/rollout/validating/rollout_create_update_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ type RolloutCreateUpdateHandler struct {

var _ admission.Handler = &RolloutCreateUpdateHandler{}

// default is 30%
const defaultReplicasLimitWithTraffic = 30

var replicasLimitWithTraffic = defaultReplicasLimitWithTraffic
var isPartitionStyle = false

// Handle handles admission requests.
func (h *RolloutCreateUpdateHandler) Handle(ctx context.Context, req admission.Request) admission.Response {
switch req.Operation {
Expand Down Expand Up @@ -155,6 +161,14 @@ func (h *RolloutCreateUpdateHandler) validateRolloutUpdate(oldObj, newObj *appsv
}

func (h *RolloutCreateUpdateHandler) validateRollout(rollout *appsv1beta1.Rollout) field.ErrorList {
isPartitionStyle = appsv1beta1.IsRealPartition(rollout)
replicasLimitWithTraffic = defaultReplicasLimitWithTraffic // reset to default value
if limit, ok := rollout.Annotations[appsv1beta1.PartitionReplicasLimitWithTraffic]; ok {
strVal := intstr.FromString(limit)
if val, err := intstr.GetScaledValueFromIntOrPercent(&strVal, 100, true); err == nil && 0 < val && val <= 100 {
replicasLimitWithTraffic = val
}
}
errList := validateRolloutSpec(rollout, field.NewPath("Spec"))
errList = append(errList, h.validateRolloutConflict(rollout, field.NewPath("Conflict Checker"))...)
return errList
Expand Down Expand Up @@ -213,6 +227,7 @@ type TrafficRule string

const (
TrafficRuleCanary TrafficRule = "Canary"
TrafficRulePartition TrafficRule = "Partition"
TrafficRuleBlueGreen TrafficRule = "BlueGreen"
NoTraffic TrafficRule = "NoTraffic"
)
Expand All @@ -221,6 +236,9 @@ func validateRolloutSpecCanaryStrategy(canary *appsv1beta1.CanaryStrategy, fldPa
trafficRule := NoTraffic
if len(canary.TrafficRoutings) > 0 {
trafficRule = TrafficRuleCanary
if isPartitionStyle {
trafficRule = TrafficRulePartition
}
}
errList := validateRolloutSpecCanarySteps(canary.Steps, fldPath.Child("Steps"), trafficRule)
if len(canary.TrafficRoutings) > 1 {
Expand Down Expand Up @@ -293,7 +311,17 @@ func validateRolloutSpecCanarySteps(steps []appsv1beta1.CanaryStep, fldPath *fie
return field.ErrorList{field.Invalid(fldPath.Index(i).Child("Replicas"),
s.Replicas, `replicas must be positive number, or a percentage with "0%" < canaryReplicas <= "100%"`)}
}
if trafficRule == NoTraffic || s.Traffic == nil {
if trafficRule == NoTraffic || (s.Traffic == nil && len(s.Matches) == 0) {
continue
}
// traffic strategy is configured for current step && replicas is percentage
if trafficRule == TrafficRulePartition && IsPercentageCanaryReplicasType(s.Replicas) {
currCanaryReplicas, _ := intstr.GetScaledValueFromIntOrPercent(s.Replicas, 100, true)
if currCanaryReplicas > replicasLimitWithTraffic {
return field.ErrorList{field.Invalid(fldPath.Index(i).Child("steps"), steps, `For patition style rollout: step[x].replicas must not greater than replicasLimitWithTraffic if traffic or matches specified`)}
}
}
if s.Traffic == nil {
continue
}
is := intstr.FromString(*s.Traffic)
Expand Down

0 comments on commit caca0f3

Please sign in to comment.