diff --git a/pkg/controller/rollout/rollout_canary.go b/pkg/controller/rollout/rollout_canary.go index 8c7cdb15..e77760be 100644 --- a/pkg/controller/rollout/rollout_canary.go +++ b/pkg/controller/rollout/rollout_canary.go @@ -76,19 +76,18 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { klog.Infof("rollout(%s/%s) canary step jumped", c.Rollout.Namespace, c.Rollout.Name) return nil } - gracePeriodSeconds := util.GracePeriodSecondsOrDefault(c.Rollout.Spec.Strategy.GetTrafficRouting(), defaultGracePeriodSeconds) // When the first batch is trafficRouting rolling and the next steps are rolling release, // We need to clean up the canary-related resources first and then rollout the rest of the batch. currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1] if currentStep.Traffic == nil && len(currentStep.Matches) == 0 { tr := newTrafficRoutingContext(c) - done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr, false) + done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr) c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime if err != nil { return err } else if !done { klog.Infof("rollout(%s/%s) cleaning up canary-related resources", c.Rollout.Namespace, c.Rollout.Name) - expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second) + expectedTime := time.Now().Add(tr.RecheckDuration) c.RecheckTime = &expectedTime return nil } @@ -116,11 +115,11 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true) if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) { klog.Infof("special case detected: rollout(%s/%s) restore stable Service", c.Rollout.Namespace, c.Rollout.Name) - done, err := m.trafficRoutingManager.RestoreStableService(tr) + retry, err := m.trafficRoutingManager.RestoreStableService(tr) if err != nil { return err - } else if !done { - expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second) + } else if retry { + expectedTime := time.Now().Add(tr.RecheckDuration) c.RecheckTime = &expectedTime return nil } @@ -146,11 +145,11 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { if canaryStatus.CurrentStepIndex == 1 { if !tr.DisableGenerateCanaryService { klog.Infof("special case detected: rollout(%s/%s) patch stable Service", c.Rollout.Namespace, c.Rollout.Name) - done, err := m.trafficRoutingManager.PatchStableService(tr) + retry, err := m.trafficRoutingManager.PatchStableService(tr) if err != nil { return err - } else if !done { - expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second) + } else if retry { + expectedTime := time.Now().Add(tr.RecheckDuration) c.RecheckTime = &expectedTime return nil } @@ -195,7 +194,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error { klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateTrafficRouting, canaryStatus.CurrentStepState) } - expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second) + expectedTime := time.Now().Add(tr.RecheckDuration) c.RecheckTime = &expectedTime case v1beta1.CanaryStepStateMetricsAnalysis: @@ -369,7 +368,7 @@ func (m *canaryReleaseManager) doCanaryFinalising(c *RolloutContext) (bool, erro } tr := newTrafficRoutingContext(c) // 2. remove stable service the pod revision selector, so stable service will be selector all version pods. - done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr, true) + done, err := m.trafficRoutingManager.FinalisingTrafficRouting(tr) c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime if err != nil || !done { return done, err @@ -380,7 +379,7 @@ func (m *canaryReleaseManager) doCanaryFinalising(c *RolloutContext) (bool, erro return done, err } // 4. modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods. - done, err = m.trafficRoutingManager.FinalisingTrafficRouting(tr, false) + done, err = m.trafficRoutingManager.FinalisingTrafficRouting(tr) c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime if err != nil || !done { return done, err diff --git a/pkg/controller/rollout/rollout_controller_test.go b/pkg/controller/rollout/rollout_controller_test.go index 930c6403..02fec15e 100644 --- a/pkg/controller/rollout/rollout_controller_test.go +++ b/pkg/controller/rollout/rollout_controller_test.go @@ -88,6 +88,7 @@ var ( Ingress: &v1beta1.IngressTrafficRouting{ Name: "echoserver", }, + GracePeriodSeconds: 0, // To facilitate testing, don't wait after traffic routing operation }, }, }, diff --git a/pkg/controller/rollout/rollout_progressing.go b/pkg/controller/rollout/rollout_progressing.go index 8e7396e0..7cfabf8d 100644 --- a/pkg/controller/rollout/rollout_progressing.go +++ b/pkg/controller/rollout/rollout_progressing.go @@ -431,11 +431,6 @@ func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error) if subStatus == nil { return true, nil } - gracePeriodSeconds := c.Rollout.Spec.Strategy.GetTrafficRouting()[0].GracePeriodSeconds - // To ensure respect for graceful time between these steps, we set start timer before the first step - if len(subStatus.FinalisingStep) == 0 { - subStatus.LastUpdateTime = &metav1.Time{Time: time.Now()} - } tr := newTrafficRoutingContext(c) klog.Infof("rollout(%s/%s) Finalising Step is %s", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep) switch subStatus.FinalisingStep { @@ -446,21 +441,11 @@ func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error) // firstly, restore the gateway resources (ingress/gatewayAPI/Istio), that means // only stable Service will accept the traffic case v1beta1.FinalisingStepTypeGateway: - //TODO - RestoreGateway returns (bool, error) pair instead of error only. - // return (fasle, nil): gateway is patched successfully, but we need time to observe; recheck later - // return (true, nil): gateway is patched successfully, and accepts the update successfully; go to next step then - // return (false, error): gateway encounters error when patched, or the update is not accepted; recheck later - err := r.trafficRoutingManager.RestoreGateway(tr) - if err != nil { + retry, err := r.trafficRoutingManager.RestoreGateway(tr) + if err != nil || retry { subStatus.LastUpdateTime = tr.LastUpdateTime return false, err } - // usually, GracePeriodSeconds means duration to wait after an operation is done, - // we use defaultGracePeriodSeconds+1 here because the timer started before the RestoreGateway step - if subStatus.LastUpdateTime != nil && time.Since(subStatus.LastUpdateTime.Time) < time.Second*time.Duration(gracePeriodSeconds+1) { - klog.Infof("rollout(%s/%s) in step (%s), and wait %d seconds", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep, gracePeriodSeconds+1) - return false, nil - } klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, subStatus.FinalisingStep) subStatus.LastUpdateTime = &metav1.Time{Time: time.Now()} subStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteBR @@ -490,7 +475,8 @@ func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error) first step of v3 release. */ case v1beta1.FinalisingStepTypeDeleteCanaryService: - err := r.trafficRoutingManager.RemoveCanaryService(tr) + // ignore the grace period because it is the last step + _, err := r.trafficRoutingManager.RemoveCanaryService(tr) if err != nil { subStatus.LastUpdateTime = tr.LastUpdateTime return false, err diff --git a/pkg/controller/rollout/rollout_progressing_test.go b/pkg/controller/rollout/rollout_progressing_test.go index dbef5108..15098490 100644 --- a/pkg/controller/rollout/rollout_progressing_test.go +++ b/pkg/controller/rollout/rollout_progressing_test.go @@ -533,7 +533,7 @@ func TestReconcileRolloutProgressing(t *testing.T) { }, }, { - name: "ReconcileRolloutProgressing rolling -> continueRelease1", + name: "ReconcileRolloutProgressing rolling -> continueRelease1", // add grace time to test the first step: restoring the gateway getObj: func() ([]*apps.Deployment, []*apps.ReplicaSet) { dep1 := deploymentDemo.DeepCopy() dep1.Spec.Template.Spec.Containers[0].Image = "echoserver:v3" @@ -558,10 +558,14 @@ func TestReconcileRolloutProgressing(t *testing.T) { return []*apps.Deployment{dep1, dep2}, []*apps.ReplicaSet{rs1, rs2} }, getNetwork: func() ([]*corev1.Service, []*netv1.Ingress) { - return []*corev1.Service{demoService.DeepCopy()}, []*netv1.Ingress{demoIngress.DeepCopy()} + c1 := demoIngress.DeepCopy() + c2 := demoIngress.DeepCopy() + c2.Name = c2.Name + "-canary" + return []*corev1.Service{demoService.DeepCopy()}, []*netv1.Ingress{c1, c2} }, getRollout: func() (*v1beta1.Rollout, *v1beta1.BatchRelease, *v1alpha1.TrafficRouting) { obj := rolloutDemo.DeepCopy() + obj.Spec.Strategy.Canary.TrafficRoutings[0].GracePeriodSeconds = 1 // add grace time to test fine step in continuous logic obj.Status.CanaryStatus.ObservedWorkloadGeneration = 2 obj.Status.CanaryStatus.RolloutHash = "f55bvd874d5f2fzvw46bv966x4bwbdv4wx6bd9f7b46ww788954b8z8w29b7wxfd" obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1" @@ -635,7 +639,7 @@ func TestReconcileRolloutProgressing(t *testing.T) { obj.Status.CanaryStatus.CanaryReadyReplicas = 3 obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2" obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade - obj.Status.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteCanaryService + obj.Status.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteBR cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing) cond.Reason = v1alpha1.ProgressingReasonInRolling util.SetRolloutCondition(&obj.Status, *cond) diff --git a/pkg/controller/trafficrouting/trafficrouting_controller.go b/pkg/controller/trafficrouting/trafficrouting_controller.go index e3e44df1..6d0928ba 100644 --- a/pkg/controller/trafficrouting/trafficrouting_controller.go +++ b/pkg/controller/trafficrouting/trafficrouting_controller.go @@ -123,13 +123,13 @@ func (r *TrafficRoutingReconciler) Reconcile(ctx context.Context, req ctrl.Reque done, err = r.trafficRoutingManager.DoTrafficRouting(newTrafficRoutingContext(tr)) } case v1alpha1.TrafficRoutingPhaseFinalizing: - done, err = r.trafficRoutingManager.FinalisingTrafficRouting(newTrafficRoutingContext(tr), false) + done, err = r.trafficRoutingManager.FinalisingTrafficRouting(newTrafficRoutingContext(tr)) if done { newStatus.Phase = v1alpha1.TrafficRoutingPhaseHealthy newStatus.Message = "TrafficRouting is Healthy" } case v1alpha1.TrafficRoutingPhaseTerminating: - done, err = r.trafficRoutingManager.FinalisingTrafficRouting(newTrafficRoutingContext(tr), false) + done, err = r.trafficRoutingManager.FinalisingTrafficRouting(newTrafficRoutingContext(tr)) if done { // remove trafficRouting finalizer err = r.handleFinalizer(tr) diff --git a/pkg/trafficrouting/manager.go b/pkg/trafficrouting/manager.go index a71e2ebe..3c2792fc 100644 --- a/pkg/trafficrouting/manager.go +++ b/pkg/trafficrouting/manager.go @@ -27,12 +27,13 @@ import ( "github.com/openkruise/rollouts/pkg/trafficrouting/network/gateway" "github.com/openkruise/rollouts/pkg/trafficrouting/network/ingress" "github.com/openkruise/rollouts/pkg/util" + "github.com/openkruise/rollouts/pkg/util/grace" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" - utilpointer "k8s.io/utils/pointer" + "k8s.io/utils/integer" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -59,6 +60,8 @@ type TrafficRoutingContext struct { LastUpdateTime *metav1.Time // won't work for Ingress and Gateway DisableGenerateCanaryService bool + // recheck time + RecheckDuration time.Duration } // Manager responsible for adjusting network resources @@ -158,8 +161,6 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { klog.Errorf("%s patch canary service(%s) selector failed: %s", c.Key, canaryService.Name, err.Error()) return false, err } - // update canary service time, and wait 3 seconds, just to be safe - c.LastUpdateTime = &metav1.Time{Time: time.Now()} klog.Infof("%s patch canary service(%s) selector(%s=%s) success", c.Key, canaryService.Name, c.RevisionLabelKey, c.CanaryRevision) serviceModified = true @@ -172,12 +173,12 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { return false, err } serviceModified = true - // update stable service time, and wait 3 seconds, just to be safe - c.LastUpdateTime = &metav1.Time{Time: time.Now()} klog.Infof("add %s stable service(%s) selector(%s=%s) success", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision) } if serviceModified { + // modification occurred, wait a grace period + c.LastUpdateTime = &metav1.Time{Time: time.Now()} return false, nil } } @@ -199,211 +200,120 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) { return true, nil } -func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestoreStableService bool) (bool, error) { +func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext) (bool, error) { if len(c.ObjectRef) == 0 { return true, nil } - trafficRouting := c.ObjectRef[0] - if trafficRouting.GracePeriodSeconds <= 0 { - trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds - } - - cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService) - trController, err := newNetworkProvider(m.Client, c, trafficRouting.Service, cServiceName) - if err != nil { - klog.Errorf("%s newTrafficRoutingController failed: %s", c.Key, err.Error()) - return false, err - } - noCanaryService := c.OnlyTrafficRouting || c.DisableGenerateCanaryService - // The "already-finalised" conditions of this FinalisingTrafficRouting function are: - // 1. the stable service has no selector - // 2. AND canary service has been cleaned up - var stableServiceRestored, canaryServiceRemoved bool - // check condition 1 - stableService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: trafficRouting.Service}} - if err := m.Get(context.TODO(), client.ObjectKeyFromObject(stableService), stableService); err != nil && !errors.IsNotFound(err) { - klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error()) - return false, err - } else { - stableServiceRestored = errors.IsNotFound(err) || stableService.Spec.Selector[c.RevisionLabelKey] == "" - } - // check condition 2 - cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}} - if err := m.Get(context.TODO(), client.ObjectKeyFromObject(cService), cService); err != nil && !errors.IsNotFound(err) { - klog.Errorf("%s get canary service(%s) failed: %s", c.Key, cServiceName, err.Error()) - return false, err - } else { - // if noCanaryService is true, we have never created canary service - canaryServiceRemoved = errors.IsNotFound(err) || noCanaryService - } - // only if both conditions are met - if canaryServiceRemoved && stableServiceRestored { - /* - even both of the conditions are met, we call Finalise - 1. In rollout failure case, this step ensures that the canary-ingress can be deleted in a time. - 2. For scenario that noCanaryService is true, stable Service is never patched, canary Service is never created, - What we need to do is just to call Finalise - note that, calling Finalise even for multiple times is not a big thing: - 1. it does nothing if it has already called once, since the corresponding annotation has been cleared - 2. the Finalish won't wait graceful period for now - */ - if err = trController.Finalise(context.TODO()); err != nil { - return false, err - } - return true, nil - } - klog.Infof("%s start finalising traffic routing", c.Key) // remove stable service the pod revision selector, so stable service will be selector all version pods. - verify, err := m.restoreStableService(c) - if err != nil || !verify { + if retry, err := m.RestoreStableService(c); err != nil || retry { return false, err - } else if onlyRestoreStableService { - return true, nil - } - - // First route 100% traffic to stable service - c.Strategy.Traffic = utilpointer.StringPtr("0%") - verify, err = trController.EnsureRoutes(context.TODO(), &c.Strategy) - if err != nil { - return false, err - } else if !verify { - c.LastUpdateTime = &metav1.Time{Time: time.Now()} - return false, nil } - if c.LastUpdateTime != nil { - // After restore the stable service configuration, give network provider 3 seconds to react - if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) { - klog.Infof("%s route 100% traffic to stable service, and wait a moment", c.Key) - return false, nil - } - } - // modify network(ingress & gateway api) configuration, route all traffic to stable service - if err = trController.Finalise(context.TODO()); err != nil { + if retry, err := m.RestoreGateway(c); err != nil || retry { return false, err } - // end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service; - // because canary service is stable service (ie. no external canary service was created at all) - if !noCanaryService { - // remove canary service - err = m.Delete(context.TODO(), cService) - if err != nil && !errors.IsNotFound(err) { - klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error()) - return false, err - } - klog.Infof("%s remove canary service(%s) success", c.Key, cService.Name) + // remove canary service + if retry, err := m.RemoveCanaryService(c); err != nil || retry { + return false, err } return true, nil } // RestoreGateway restore gateway resources without graceful time -func (m *Manager) RestoreGateway(c *TrafficRoutingContext) error { +// err means error occurred, should retry later. only if err is nil, we consider the bool +// bool means whether the calling function should retry later +func (m *Manager) RestoreGateway(c *TrafficRoutingContext) (bool, error) { if len(c.ObjectRef) == 0 { - return nil + return false, nil } - trafficRouting := c.ObjectRef[0] - cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService) - trController, err := newNetworkProvider(m.Client, c, trafficRouting.Service, cServiceName) + // build up the network provider + stableService := c.ObjectRef[0].Service + cServiceName := getCanaryServiceName(stableService, c.OnlyTrafficRouting, c.DisableGenerateCanaryService) + trController, err := newNetworkProvider(m.Client, c, stableService, cServiceName) if err != nil { klog.Errorf("%s newTrafficRoutingController failed: %s", c.Key, err.Error()) - return err - } - return trController.Finalise(context.TODO()) -} - -// RemoveCanaryService find and delete canary Service. stable Service won't be modified -func (m *Manager) RemoveCanaryService(c *TrafficRoutingContext) error { - if len(c.ObjectRef) == 0 { - return nil + return false, err } - trafficRouting := c.ObjectRef[0] - cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService) - cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}} - // end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service; - // because canary service is stable service (ie. no external canary service was created at all) - if !(c.OnlyTrafficRouting || c.DisableGenerateCanaryService) { - // remove canary service - err := m.Delete(context.TODO(), cService) - if err != nil && !errors.IsNotFound(err) { - klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error()) - return err + // restore Gateway/Ingress/Istio + graceSeconds := GetGraceSeconds(c.ObjectRef, defaultGracePeriodSeconds) + retry, remaining, err := grace.RunWithGraceSeconds(string(c.OwnerRef.UID), "restoreGateway", graceSeconds, func() (bool, error) { + modified, err := trController.Finalise(context.TODO()) + if modified { + c.LastUpdateTime = &metav1.Time{Time: time.Now()} } - klog.Infof("%s remove canary service(%s) success", c.Key, cService.Name) - } - - return nil + return modified, err + }) + UpdateRecheckDuration(c, remaining) + return retry, err } -// returning (false, nil) means the update has been submitted, and no error occurred -// but we need to wait graceful time before returning true -func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) { +func (m *Manager) RemoveCanaryService(c *TrafficRoutingContext) (bool, error) { if len(c.ObjectRef) == 0 { - return true, nil + return false, nil } + // end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service; + // because canary service is stable service (ie. canary service is never created from the beginning) if c.OnlyTrafficRouting || c.DisableGenerateCanaryService { - return true, nil + return false, nil } - gracePeriodSeconds := util.GracePeriodSecondsOrDefault(c.ObjectRef, defaultGracePeriodSeconds) - trafficRouting := c.ObjectRef[0] - //fetch stable service - stableService := &corev1.Service{} - err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService) - if err != nil { - klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error()) - // not found, wait a moment, retry + cServiceName := getCanaryServiceName(c.ObjectRef[0].Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService) + cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}} + key := types.NamespacedName{ + Namespace: c.Namespace, + Name: cServiceName, + } + // remove canary service + graceSeconds := GetGraceSeconds(c.ObjectRef, defaultGracePeriodSeconds) + retry, remaining, err := grace.RunWithGraceSeconds(key.String(), "removeCanaryService", graceSeconds, func() (bool, error) { + err := m.Delete(context.TODO(), cService) if errors.IsNotFound(err) { return false, nil } - return false, err - } - - if stableService.Spec.Selector[c.RevisionLabelKey] == c.StableRevision { - if c.LastUpdateTime == nil { - return true, nil - } - if time.Since(c.LastUpdateTime.Time) < time.Second*time.Duration(gracePeriodSeconds) { - klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, gracePeriodSeconds) - return false, nil + if err != nil { + klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error()) + return false, err } - klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success and complete", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision) return true, nil - } - - // patch stable service to only select the stable pods - body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.StableRevision) - if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { - klog.Errorf("%s patch stable service(%s) selector failed: %s", c.Key, stableService.Name, err.Error()) - return false, err - } - c.LastUpdateTime = &metav1.Time{Time: time.Now()} - klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, gracePeriodSeconds) - return false, nil + }) + UpdateRecheckDuration(c, remaining) + return retry, err } -// returning (false, nil) means the update has been submitted, and no error occurred -// but we need to wait graceful time before returning true -func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) { +func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) { if len(c.ObjectRef) == 0 { + return false, nil + } + if c.OnlyTrafficRouting || c.DisableGenerateCanaryService { return true, nil } - trafficRouting := c.ObjectRef[0] - //fetch stable service + + // fetch stable service stableService := &corev1.Service{} - err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService) + serviceName := c.ObjectRef[0].Service + err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: serviceName}, stableService) if err != nil { - if errors.IsNotFound(err) { - return true, nil - } - klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error()) + klog.Errorf("%s get stable service(%s) failed: %s", c.Key, serviceName, err.Error()) return false, err } + // restore stable Service - verify, err := m.restoreStableService(c) - if err != nil || !verify { - return false, err - } - return true, nil + graceSeconds := GetGraceSeconds(c.ObjectRef, defaultGracePeriodSeconds) + retry, remaining, err := grace.RunWithGraceSeconds(string(stableService.UID), "patchService", graceSeconds, func() (bool, error) { + modified := false + if stableService.Spec.Selector[c.RevisionLabelKey] != c.StableRevision { + body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.StableRevision) + if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { + klog.Errorf("%s patch stable service(%s) selector failed: %s", c.Key, stableService.Name, err.Error()) + return false, err + } + c.LastUpdateTime = &metav1.Time{Time: time.Now()} + modified = true + } + return modified, nil + }) + UpdateRecheckDuration(c, remaining) + return retry, err } func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, cService string) (network.NetworkProvider, error) { @@ -475,41 +385,40 @@ func (m *Manager) createCanaryService(c *TrafficRoutingContext, cService string, } // remove stable service the pod revision selector, so stable service will be selector all version pods. -func (m *Manager) restoreStableService(c *TrafficRoutingContext) (bool, error) { - trafficRouting := c.ObjectRef[0] - if trafficRouting.GracePeriodSeconds <= 0 { - trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds +func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) { + if len(c.ObjectRef) == 0 { + return false, nil } - //fetch stable service + + // fetch the stable Service stableService := &corev1.Service{} - err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService) + serviceName := c.ObjectRef[0].Service + err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: serviceName}, stableService) + if errors.IsNotFound(err) { + return true, nil + } if err != nil { - if errors.IsNotFound(err) { - return true, nil - } - klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error()) + klog.Errorf("%s get stable service(%s) failed: %s", c.Key, serviceName, err.Error()) return false, err } - if stableService.Spec.Selector[c.RevisionLabelKey] != "" { - body := fmt.Sprintf(`{"spec":{"selector":{"%s":null}}}`, c.RevisionLabelKey) - if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { - klog.Errorf("%s patch stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error()) - return false, err + + // restore stable Service + graceSeconds := GetGraceSeconds(c.ObjectRef, defaultGracePeriodSeconds) + retry, remaining, err := grace.RunWithGraceSeconds(string(stableService.UID), "restoreService", graceSeconds, func() (bool, error) { + modified := false + if stableService.Spec.Selector[c.RevisionLabelKey] != "" { + body := fmt.Sprintf(`{"spec":{"selector":{"%s":null}}}`, c.RevisionLabelKey) + if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil { + klog.Errorf("%s patch stable service(%s) failed: %s", c.Key, serviceName, err.Error()) + return false, err + } + c.LastUpdateTime = &metav1.Time{Time: time.Now()} + modified = true } - klog.Infof("remove %s stable service(%s) pod revision selector, and wait a moment", c.Key, trafficRouting.Service) - c.LastUpdateTime = &metav1.Time{Time: time.Now()} - return false, nil - } - if c.LastUpdateTime == nil { - return true, nil - } - // After restore the stable service configuration, give network provider 3 seconds to react - if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) { - klog.Infof("%s restoring stable service(%s), and wait a moment", c.Key, trafficRouting.Service) - return false, nil - } - klog.Infof("%s doFinalising stable service(%s) success", c.Key, trafficRouting.Service) - return true, nil + return modified, nil + }) + UpdateRecheckDuration(c, remaining) + return retry, err } func getCanaryServiceName(sService string, onlyTrafficRouting bool, disableGenerateCanaryService bool) string { @@ -518,3 +427,27 @@ func getCanaryServiceName(sService string, onlyTrafficRouting bool, disableGener } return fmt.Sprintf("%s-canary", sService) } + +func GetGraceSeconds(refs []v1beta1.TrafficRoutingRef, defaultSeconds int32) (graceSeconds int32) { + if len(refs) == 0 { + klog.Infof("no trafficRoutingRef, use defaultGracePeriodSeconds(%d)", defaultSeconds) + return defaultSeconds + } + for i := range refs { + graceSeconds = integer.Int32Max(graceSeconds, refs[i].GracePeriodSeconds) + } + // user may intentionally set graceSeconds as 0 (if not provided, defaults to 3) + // we respect it + if graceSeconds < 0 { + klog.Infof("negative graceSeconds(%d), use defaultGracePeriodSeconds(%d)", graceSeconds, defaultSeconds) + return defaultSeconds + } + klog.Infof("use graceSeconds(%d)", graceSeconds) + return +} + +func UpdateRecheckDuration(c *TrafficRoutingContext, remaining time.Duration) { + if c.RecheckDuration < remaining { + c.RecheckDuration = remaining + } +} diff --git a/pkg/trafficrouting/manager_test.go b/pkg/trafficrouting/manager_test.go index f8f6bf06..2ce96fd3 100644 --- a/pkg/trafficrouting/manager_test.go +++ b/pkg/trafficrouting/manager_test.go @@ -29,6 +29,7 @@ import ( "github.com/openkruise/rollouts/api/v1beta1" "github.com/openkruise/rollouts/pkg/util" "github.com/openkruise/rollouts/pkg/util/configuration" + "github.com/openkruise/rollouts/pkg/util/grace" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" @@ -156,6 +157,8 @@ var ( Ingress: &v1beta1.IngressTrafficRouting{ Name: "echoserver", }, + // webhook doesn't work in unit test, we manually set gracePeriodSeconds to 1 + GracePeriodSeconds: 1, }, }, }, @@ -932,15 +935,16 @@ func TestDoTrafficRoutingWithIstio(t *testing.T) { func TestFinalisingTrafficRouting(t *testing.T) { cases := []struct { - name string - getObj func() ([]*corev1.Service, []*netv1.Ingress) - getRollout func() (*v1beta1.Rollout, *util.Workload) - onlyRestoreStableService bool - expectObj func() ([]*corev1.Service, []*netv1.Ingress) - expectDone bool + name string + getObj func() ([]*corev1.Service, []*netv1.Ingress) + getRollout func() (*v1beta1.Rollout, *util.Workload) + onlyTrafficRouting bool + expectObj func() ([]*corev1.Service, []*netv1.Ingress) + expectNotFound func() ([]*corev1.Service, []*netv1.Ingress) + expectDone bool }{ { - name: "FinalisingTrafficRouting test1", + name: "FinalisingTrafficRouting test1", // will restore the stable service getObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() s1.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v1" @@ -962,7 +966,6 @@ func TestFinalisingTrafficRouting(t *testing.T) { obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, - onlyRestoreStableService: true, expectObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() s2 := demoService.DeepCopy() @@ -976,10 +979,13 @@ func TestFinalisingTrafficRouting(t *testing.T) { c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary" return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2} }, + expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) { + return nil, nil + }, expectDone: false, }, { - name: "FinalisingTrafficRouting test2", + name: "stable Service already clear", // will restore the gateway getObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() s2 := demoService.DeepCopy() @@ -997,129 +1003,137 @@ func TestFinalisingTrafficRouting(t *testing.T) { obj := demoRollout.DeepCopy() obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted obj.Status.CanaryStatus.CurrentStepIndex = 4 - obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(time.Hour)} + obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, - onlyRestoreStableService: true, expectObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() - s2 := demoService.DeepCopy() - s2.Name = "echoserver-canary" - s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2" c1 := demoIngress.DeepCopy() + return []*corev1.Service{s1}, []*netv1.Ingress{c1} + }, + expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) { c2 := demoIngress.DeepCopy() c2.Name = "echoserver-canary" - c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true" - c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100" - c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary" - return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2} + return nil, []*netv1.Ingress{c2} }, expectDone: false, }, { - name: "FinalisingTrafficRouting test3", + name: "gateway already restored", // will remove the canary service getObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() s2 := demoService.DeepCopy() s2.Name = "echoserver-canary" s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2" c1 := demoIngress.DeepCopy() + return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1} + }, + getRollout: func() (*v1beta1.Rollout, *util.Workload) { + obj := demoRollout.DeepCopy() + obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted + obj.Status.CanaryStatus.CurrentStepIndex = 4 + obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)} + return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} + }, + expectObj: func() ([]*corev1.Service, []*netv1.Ingress) { + s1 := demoService.DeepCopy() + c1 := demoIngress.DeepCopy() + return []*corev1.Service{s1}, []*netv1.Ingress{c1} + }, + expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) { + s2 := demoService.DeepCopy() + s2.Name = "echoserver-canary" c2 := demoIngress.DeepCopy() c2.Name = "echoserver-canary" - c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true" - c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100" - c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary" - return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2} + return []*corev1.Service{s2}, []*netv1.Ingress{c2} + }, + expectDone: false, + }, + { + name: "canary Service already clear", // all is done + getObj: func() ([]*corev1.Service, []*netv1.Ingress) { + s1 := demoService.DeepCopy() + c1 := demoIngress.DeepCopy() + return []*corev1.Service{s1}, []*netv1.Ingress{c1} }, getRollout: func() (*v1beta1.Rollout, *util.Workload) { obj := demoRollout.DeepCopy() obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted obj.Status.CanaryStatus.CurrentStepIndex = 4 - obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-10 * time.Second)} + obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, - onlyRestoreStableService: true, expectObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() + c1 := demoIngress.DeepCopy() + return []*corev1.Service{s1}, []*netv1.Ingress{c1} + }, + expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) { s2 := demoService.DeepCopy() s2.Name = "echoserver-canary" - s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2" - c1 := demoIngress.DeepCopy() c2 := demoIngress.DeepCopy() c2.Name = "echoserver-canary" - c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true" - c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100" - c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary" - return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2} + return []*corev1.Service{s2}, []*netv1.Ingress{c2} }, expectDone: true, }, { - name: "FinalisingTrafficRouting test4", + name: "OnlyTrafficRouting true - test 1", getObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() - s2 := demoService.DeepCopy() - s2.Name = "echoserver-canary" - s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2" c1 := demoIngress.DeepCopy() c2 := demoIngress.DeepCopy() c2.Name = "echoserver-canary" c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true" c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "100" c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary" - return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2} + return []*corev1.Service{s1}, []*netv1.Ingress{c1, c2} }, getRollout: func() (*v1beta1.Rollout, *util.Workload) { obj := demoRollout.DeepCopy() obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted obj.Status.CanaryStatus.CurrentStepIndex = 4 - obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-3 * time.Second)} + obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, - onlyRestoreStableService: false, + onlyTrafficRouting: true, expectObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() - s2 := demoService.DeepCopy() - s2.Name = "echoserver-canary" - s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2" c1 := demoIngress.DeepCopy() + return []*corev1.Service{s1}, []*netv1.Ingress{c1} + }, + expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) { c2 := demoIngress.DeepCopy() c2.Name = "echoserver-canary" - c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true" - c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "0" - c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary" - return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2} + return nil, []*netv1.Ingress{c2} }, expectDone: false, }, { - name: "FinalisingTrafficRouting test5", + name: "OnlyTrafficRouting true - test 2", getObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() - s2 := demoService.DeepCopy() - s2.Name = "echoserver-canary" - s2.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey] = "podtemplatehash-v2" c1 := demoIngress.DeepCopy() - c2 := demoIngress.DeepCopy() - c2.Name = "echoserver-canary" - c2.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)] = "true" - c2.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)] = "0" - c2.Spec.Rules[0].HTTP.Paths[0].Backend.Service.Name = "echoserver-canary" - return []*corev1.Service{s1, s2}, []*netv1.Ingress{c1, c2} + return []*corev1.Service{s1}, []*netv1.Ingress{c1} }, getRollout: func() (*v1beta1.Rollout, *util.Workload) { obj := demoRollout.DeepCopy() obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateCompleted obj.Status.CanaryStatus.CurrentStepIndex = 4 - obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-3 * time.Second)} + obj.Status.CanaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now().Add(-time.Hour)} return obj, &util.Workload{RevisionLabelKey: apps.DefaultDeploymentUniqueLabelKey} }, - onlyRestoreStableService: false, + onlyTrafficRouting: true, expectObj: func() ([]*corev1.Service, []*netv1.Ingress) { s1 := demoService.DeepCopy() c1 := demoIngress.DeepCopy() return []*corev1.Service{s1}, []*netv1.Ingress{c1} }, + expectNotFound: func() ([]*corev1.Service, []*netv1.Ingress) { + c2 := demoIngress.DeepCopy() + c2.Name = "echoserver-canary" + return nil, []*netv1.Ingress{c2} + }, expectDone: true, }, } @@ -1138,18 +1152,19 @@ func TestFinalisingTrafficRouting(t *testing.T) { newStatus := rollout.Status.DeepCopy() currentStep := rollout.Spec.Strategy.Canary.Steps[newStatus.CanaryStatus.CurrentStepIndex-1] c := &TrafficRoutingContext{ - Key: fmt.Sprintf("Rollout(%s/%s)", rollout.Namespace, rollout.Name), - Namespace: rollout.Namespace, - ObjectRef: rollout.Spec.Strategy.Canary.TrafficRoutings, - Strategy: currentStep.TrafficRoutingStrategy, - OwnerRef: *metav1.NewControllerRef(rollout, v1beta1.SchemeGroupVersion.WithKind("Rollout")), - RevisionLabelKey: workload.RevisionLabelKey, - StableRevision: newStatus.CanaryStatus.StableRevision, - CanaryRevision: newStatus.CanaryStatus.PodTemplateHash, - LastUpdateTime: newStatus.CanaryStatus.LastUpdateTime, + Key: fmt.Sprintf("Rollout(%s/%s)", rollout.Namespace, rollout.Name), + Namespace: rollout.Namespace, + ObjectRef: rollout.Spec.Strategy.Canary.TrafficRoutings, + Strategy: currentStep.TrafficRoutingStrategy, + OwnerRef: *metav1.NewControllerRef(rollout, v1beta1.SchemeGroupVersion.WithKind("Rollout")), + RevisionLabelKey: workload.RevisionLabelKey, + StableRevision: newStatus.CanaryStatus.StableRevision, + CanaryRevision: newStatus.CanaryStatus.PodTemplateHash, + LastUpdateTime: newStatus.CanaryStatus.LastUpdateTime, + OnlyTrafficRouting: cs.onlyTrafficRouting, } manager := NewTrafficRoutingManager(client) - done, err := manager.FinalisingTrafficRouting(c, cs.onlyRestoreStableService) + done, err := manager.FinalisingTrafficRouting(c) if err != nil { t.Fatalf("DoTrafficRouting failed: %s", err) } @@ -1163,6 +1178,16 @@ func TestFinalisingTrafficRouting(t *testing.T) { for _, obj := range ig { checkObjEqual(client, t, obj) } + + ss, ig = cs.expectNotFound() + for _, obj := range ss { + checkNotFound(client, t, obj) + } + for _, obj := range ig { + checkNotFound(client, t, obj) + } + // empty the grace expectations to avoid making effects on the following test + grace.ResetExpectations() }) } } @@ -1175,6 +1200,7 @@ func TestRestoreGateway(t *testing.T) { onlyTrafficRouting bool expectObj func() ([]*corev1.Service, []*netv1.Ingress) expectNotFound func() ([]*corev1.Service, []*netv1.Ingress) + retry bool }{ { name: "Restore Gateway test1", @@ -1212,6 +1238,7 @@ func TestRestoreGateway(t *testing.T) { c2.Name = "echoserver-canary" return nil, []*netv1.Ingress{c2} }, + retry: true, }, } @@ -1241,9 +1268,12 @@ func TestRestoreGateway(t *testing.T) { OnlyTrafficRouting: cs.onlyTrafficRouting, } manager := NewTrafficRoutingManager(cli) - err := manager.RestoreGateway(c) + retry, err := manager.RestoreGateway(c) if err != nil { - t.Fatalf("DoTrafficRouting failed: %s", err) + t.Fatalf("RestoreGateway failed: %s", err) + } + if retry != cs.retry { + t.Fatalf("RestoreGateway expect(%v), but get(%v)", cs.retry, retry) } ss, ig = cs.expectObj() for _, obj := range ss { @@ -1260,6 +1290,12 @@ func TestRestoreGateway(t *testing.T) { for _, obj := range ig { checkNotFound(cli, t, obj) } + // the second call, it should be no error and no retry + time.Sleep(1 * time.Second) + retry, err = manager.RestoreGateway(c) + if err != nil || retry { + t.Fatalf("RestoreGateway failed: %s", err) + } }) } } @@ -1272,6 +1308,7 @@ func TestRemoveCanaryService(t *testing.T) { onlyTrafficRouting bool expectObj func() ([]*corev1.Service, []*netv1.Ingress) expectNotFound func() ([]*corev1.Service, []*netv1.Ingress) + retry bool }{ { name: "Restore Gateway test1", @@ -1311,6 +1348,7 @@ func TestRemoveCanaryService(t *testing.T) { s2.Name = "echoserver-canary" return []*corev1.Service{s2}, nil }, + retry: true, }, } @@ -1340,9 +1378,12 @@ func TestRemoveCanaryService(t *testing.T) { OnlyTrafficRouting: cs.onlyTrafficRouting, } manager := NewTrafficRoutingManager(cli) - err := manager.RemoveCanaryService(c) + retry, err := manager.RemoveCanaryService(c) if err != nil { - t.Fatalf("DoTrafficRouting failed: %s", err) + t.Fatalf("RemoveCanaryService failed: %s", err) + } + if retry != cs.retry { + t.Fatalf("RemoveCanaryService expect(%v), but get(%v)", false, retry) } ss, ig = cs.expectObj() for _, obj := range ss { @@ -1359,6 +1400,12 @@ func TestRemoveCanaryService(t *testing.T) { for _, obj := range ig { checkNotFound(cli, t, obj) } + // the second call, it should be no error and no retry + time.Sleep(1 * time.Second) + retry, err = manager.RemoveCanaryService(c) + if err != nil || retry { + t.Fatalf("RemoveCanaryService failed: %s", err) + } }) } } diff --git a/pkg/trafficrouting/network/customNetworkProvider/custom_network_provider.go b/pkg/trafficrouting/network/customNetworkProvider/custom_network_provider.go index c2828769..7e63b0ab 100644 --- a/pkg/trafficrouting/network/customNetworkProvider/custom_network_provider.go +++ b/pkg/trafficrouting/network/customNetworkProvider/custom_network_provider.go @@ -36,6 +36,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/klog/v2" utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" @@ -112,11 +113,6 @@ func (r *customController) Initialize(ctx context.Context) error { // when ensuring routes, first execute lua for all custom providers, then update func (r *customController) EnsureRoutes(ctx context.Context, strategy *v1beta1.TrafficRoutingStrategy) (bool, error) { done := true - // *strategy.Weight == 0 indicates traffic routing is doing finalising and tries to route whole traffic to stable service - // then directly do finalising - if strategy.Traffic != nil && *strategy.Traffic == "0%" { - return true, nil - } var err error customNetworkRefList := make([]*unstructured.Unstructured, len(r.conf.TrafficConf)) @@ -182,8 +178,9 @@ func (r *customController) EnsureRoutes(ctx context.Context, strategy *v1beta1.T return done, nil } -func (r *customController) Finalise(ctx context.Context) error { - done := true +func (r *customController) Finalise(ctx context.Context) (bool, error) { + modified := false + errList := field.ErrorList{} for _, ref := range r.conf.TrafficConf { obj := &unstructured.Unstructured{} obj.SetAPIVersion(ref.APIVersion) @@ -193,19 +190,19 @@ func (r *customController) Finalise(ctx context.Context) error { klog.Infof("custom network provider %s(%s/%s) not found when finalising", ref.Kind, r.conf.RolloutNs, ref.Name) continue } + errList = append(errList, field.InternalError(field.NewPath("GetCustomNetworkProvider"), err)) klog.Errorf("failed to get %s(%s/%s) when finalising, process next first", ref.Kind, r.conf.RolloutNs, ref.Name) - done = false continue } - if err := r.restoreObject(obj); err != nil { - done = false + if updated, err := r.restoreObject(obj); err != nil { + errList = append(errList, field.InternalError(field.NewPath("RestoreCustomNetworkProvider"), err)) klog.Errorf("failed to restore %s(%s/%s) when finalising: %s", ref.Kind, r.conf.RolloutNs, ref.Name, err.Error()) + } else if updated { + modified = true } } - if !done { - return fmt.Errorf("finalising work for %s is not done", r.conf.Key) - } - return nil + + return modified, errList.ToAggregate() } // store spec of an object in OriginalSpecAnnotation @@ -237,11 +234,11 @@ func (r *customController) storeObject(obj *unstructured.Unstructured) error { } // restore an object from spec stored in OriginalSpecAnnotation -func (r *customController) restoreObject(obj *unstructured.Unstructured) error { +func (r *customController) restoreObject(obj *unstructured.Unstructured) (modified bool, err error) { annotations := obj.GetAnnotations() if annotations == nil || annotations[OriginalSpecAnnotation] == "" { klog.Infof("OriginalSpecAnnotation not found in custom network provider %s(%s/%s)", obj.GetKind(), r.conf.RolloutNs, obj.GetName()) - return nil + return false, nil } oSpecStr := annotations[OriginalSpecAnnotation] var oSpec Data @@ -251,10 +248,10 @@ func (r *customController) restoreObject(obj *unstructured.Unstructured) error { obj.SetLabels(oSpec.Labels) if err := r.Update(context.TODO(), obj); err != nil { klog.Errorf("failed to restore object %s(%s/%s) from annotation(%s): %s", obj.GetKind(), r.conf.RolloutNs, obj.GetName(), OriginalSpecAnnotation, err.Error()) - return err + return false, err } klog.Infof("restore custom network provider %s(%s/%s) from annotation(%s) success", obj.GetKind(), obj.GetNamespace(), obj.GetName(), OriginalSpecAnnotation) - return nil + return true, nil } func (r *customController) executeLuaForCanary(spec Data, strategy *v1beta1.TrafficRoutingStrategy, luaScript string) (Data, error) { diff --git a/pkg/trafficrouting/network/customNetworkProvider/custom_network_provider_test.go b/pkg/trafficrouting/network/customNetworkProvider/custom_network_provider_test.go index 82fc0f1a..0e0865b7 100644 --- a/pkg/trafficrouting/network/customNetworkProvider/custom_network_provider_test.go +++ b/pkg/trafficrouting/network/customNetworkProvider/custom_network_provider_test.go @@ -596,6 +596,7 @@ func TestFinalise(t *testing.T) { getUnstructured func() *unstructured.Unstructured getConfig func() Config expectUnstructured func() *unstructured.Unstructured + modified bool }{ { name: "test1, finalise VirtualService", @@ -631,6 +632,7 @@ func TestFinalise(t *testing.T) { _ = u.UnmarshalJSON([]byte(virtualServiceDemo)) return u }, + modified: true, }, } @@ -643,10 +645,13 @@ func TestFinalise(t *testing.T) { return } c, _ := NewCustomController(fakeCli, cs.getConfig()) - err = c.Finalise(context.TODO()) + modified, err := c.Finalise(context.TODO()) if err != nil { t.Fatalf("Initialize failed: %s", err.Error()) } + if cs.modified != modified { + t.Fatalf("is modified: expect(%v), but get(%v)", cs.modified, modified) + } checkEqual(fakeCli, t, cs.expectUnstructured()) }) } diff --git a/pkg/trafficrouting/network/gateway/gateway.go b/pkg/trafficrouting/network/gateway/gateway.go index 208c76fa..a372c54f 100644 --- a/pkg/trafficrouting/network/gateway/gateway.go +++ b/pkg/trafficrouting/network/gateway/gateway.go @@ -94,20 +94,20 @@ func (r *gatewayController) EnsureRoutes(ctx context.Context, strategy *v1beta1. return false, nil } -func (r *gatewayController) Finalise(ctx context.Context) error { +func (r *gatewayController) Finalise(ctx context.Context) (bool, error) { httpRoute := &gatewayv1beta1.HTTPRoute{} err := r.Get(ctx, types.NamespacedName{Namespace: r.conf.Namespace, Name: *r.conf.TrafficConf.HTTPRouteName}, httpRoute) if err != nil { if errors.IsNotFound(err) { - return nil + return false, nil } klog.Errorf("%s get HTTPRoute failed: %s", r.conf.Key, err.Error()) - return err + return false, err } // desired rule desiredRule := r.buildDesiredHTTPRoute(httpRoute.Spec.Rules, utilpointer.Int32(-1), nil, nil) if reflect.DeepEqual(httpRoute.Spec.Rules, desiredRule) { - return nil + return false, nil } routeClone := &gatewayv1beta1.HTTPRoute{} if err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { @@ -119,10 +119,10 @@ func (r *gatewayController) Finalise(ctx context.Context) error { return r.Client.Update(context.TODO(), routeClone) }); err != nil { klog.Errorf("update %s httpRoute(%s) failed: %s", r.conf.Key, httpRoute.Name, err.Error()) - return err + return false, err } klog.Infof("%s TrafficRouting Finalise success", r.conf.Key) - return nil + return true, nil } func (r *gatewayController) buildDesiredHTTPRoute(rules []gatewayv1beta1.HTTPRouteRule, weight *int32, matches []v1beta1.HttpRouteMatch, diff --git a/pkg/trafficrouting/network/ingress/ingress.go b/pkg/trafficrouting/network/ingress/ingress.go index dacf712d..59b01ab1 100644 --- a/pkg/trafficrouting/network/ingress/ingress.go +++ b/pkg/trafficrouting/network/ingress/ingress.go @@ -147,23 +147,23 @@ func (r *ingressController) EnsureRoutes(ctx context.Context, strategy *v1beta1. return false, nil } -func (r *ingressController) Finalise(ctx context.Context) error { +func (r *ingressController) Finalise(ctx context.Context) (bool, error) { canaryIngress := &netv1.Ingress{} err := r.Get(ctx, types.NamespacedName{Namespace: r.conf.Namespace, Name: r.canaryIngressName}, canaryIngress) if err != nil && !errors.IsNotFound(err) { klog.Errorf("%s get canary ingress(%s) failed: %s", r.conf.Key, r.canaryIngressName, err.Error()) - return err + return false, err } if errors.IsNotFound(err) || !canaryIngress.DeletionTimestamp.IsZero() { - return nil + return false, nil } // immediate delete canary ingress if err = r.Delete(ctx, canaryIngress); err != nil { klog.Errorf("%s remove canary ingress(%s) failed: %s", r.conf.Key, canaryIngress.Name, err.Error()) - return err + return false, err } klog.Infof("%s remove canary ingress(%s) success", r.conf.Key, canaryIngress.Name) - return nil + return true, nil } func (r *ingressController) buildCanaryIngress(stableIngress *netv1.Ingress) *netv1.Ingress { diff --git a/pkg/trafficrouting/network/ingress/ingress_test.go b/pkg/trafficrouting/network/ingress/ingress_test.go index 23aada3d..307e1142 100644 --- a/pkg/trafficrouting/network/ingress/ingress_test.go +++ b/pkg/trafficrouting/network/ingress/ingress_test.go @@ -707,6 +707,7 @@ func TestFinalise(t *testing.T) { getConfigmap func() *corev1.ConfigMap getIngress func() []*netv1.Ingress expectIngress func() *netv1.Ingress + modified bool }{ { name: "finalise test1", @@ -726,6 +727,7 @@ func TestFinalise(t *testing.T) { expectIngress: func() *netv1.Ingress { return nil }, + modified: true, }, } @@ -749,11 +751,14 @@ func TestFinalise(t *testing.T) { t.Fatalf("NewIngressTrafficRouting failed: %s", err.Error()) return } - err = controller.Finalise(context.TODO()) + modified, err := controller.Finalise(context.TODO()) if err != nil { t.Fatalf("EnsureRoutes failed: %s", err.Error()) return } + if modified != cs.modified { + t.Fatalf("expect(%v), but get(%v)", cs.modified, modified) + } canaryIngress := &netv1.Ingress{} err = fakeCli.Get(context.TODO(), client.ObjectKey{Name: "echoserver-canary"}, canaryIngress) if err != nil { diff --git a/pkg/trafficrouting/network/interface.go b/pkg/trafficrouting/network/interface.go index 3f4b10fe..28acdf8e 100644 --- a/pkg/trafficrouting/network/interface.go +++ b/pkg/trafficrouting/network/interface.go @@ -35,6 +35,6 @@ type NetworkProvider interface { // When the first set weight is returned false, mainly to give the provider some time to process, only when again ensure, will return true EnsureRoutes(ctx context.Context, strategy *v1beta1.TrafficRoutingStrategy) (bool, error) // Finalise will do some cleanup work after the canary rollout complete, such as delete canary ingress. - // Finalise is called with a 3-second delay after completing the canary. - Finalise(ctx context.Context) error + // if error is nil, the return bool value means if the resources are modified + Finalise(ctx context.Context) (bool, error) } diff --git a/pkg/util/grace/grace_expectations.go b/pkg/util/grace/grace_expectations.go new file mode 100644 index 00000000..dc74dfa5 --- /dev/null +++ b/pkg/util/grace/grace_expectations.go @@ -0,0 +1,171 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grace + +import ( + "flag" + "sync" + "time" + + "k8s.io/klog/v2" +) + +type Action string + +const ( + // Create action + Create Action = "create" + // Delete action + Delete Action = "delete" + // action includes: patch service selector + Update Action = "patch" + // action includes: remove service selector/restore gateway + Restore Action = "unpatch" +) + +// Define variables for the default expectation timeout and DefaultGraceExpectations instance. +var ( + ExpectationGraceTimeout time.Duration + DefaultGraceExpectations = NewGraceExpectations() +) + +func init() { + flag.DurationVar(&ExpectationGraceTimeout, "grace-timeout", time.Minute*5, "The grace expectation timeout. Defaults 5min") + DefaultGraceExpectations.StartCleaner(ExpectationGraceTimeout) +} + +// NewGraceExpectations returns a GraceExpectations. +func NewGraceExpectations() *realGraceExpectations { + return &realGraceExpectations{ + controllerCache: make(map[string]timeCache), + } +} + +type timeCache map[Action]*time.Time + +type realGraceExpectations struct { + sync.RWMutex + controllerCache map[string]timeCache // key: parent key, workload namespace/name +} + +func (r *realGraceExpectations) GetExpectations(controllerKey string) timeCache { + r.RLock() + defer r.RUnlock() + + expectations := r.controllerCache[controllerKey] + if expectations == nil { + return nil + } + res := make(timeCache, len(expectations)) + for k, v := range expectations { + res[k] = v + } + return res +} + +func (r *realGraceExpectations) Expect(controllerKey string, action Action) { + r.Lock() + defer r.Unlock() + + expectations := r.controllerCache[controllerKey] + if expectations == nil { + expectations = make(timeCache) + r.controllerCache[controllerKey] = expectations + } + recordTime := time.Now() + expectations[action] = &recordTime +} + +func (r *realGraceExpectations) Observe(controllerKey string, action Action) { + r.Lock() + defer r.Unlock() + + expectations := r.controllerCache[controllerKey] + if expectations == nil { + return + } + delete(expectations, action) + if len(expectations) == 0 { + delete(r.controllerCache, controllerKey) + } +} + +func (r *realGraceExpectations) SatisfiedExpectations(controllerKey string, action Action, graceSeconds int32) (bool, time.Duration) { + r.Lock() + defer r.Unlock() + + expectations := r.controllerCache[controllerKey] + if expectations == nil { + return true, 0 + } + recordTime, ok := expectations[action] + if !ok { + return true, 0 + } + remaining := time.Duration(graceSeconds)*time.Second - time.Since(*recordTime) + if remaining <= 0 { + return true, 0 + } + return false, remaining +} + +func (r *realGraceExpectations) DeleteExpectations(controllerKey string) { + r.Lock() + defer r.Unlock() + delete(r.controllerCache, controllerKey) +} + +// cleaning outdated items +func (r *realGraceExpectations) CleanOutdatedItems(interval time.Duration) { + r.Lock() + defer r.Unlock() + + for controllerKey, expectations := range r.controllerCache { + for action, recordTime := range expectations { + if time.Since(*recordTime) > interval { + delete(expectations, action) + } + } + if len(expectations) == 0 { + klog.Infof("clean outdated item: %s", controllerKey) + delete(r.controllerCache, controllerKey) + } + } +} + +// Start a goroutine to clean outdated items every 5 minutes +func (r *realGraceExpectations) StartCleaner(interval time.Duration) { + klog.Infof("start grace expectations cleaner, interval: %v", interval) + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for range ticker.C { + klog.Info("---clean outdated items---") + r.CleanOutdatedItems(interval) + } + }() +} + +// warning: only used for test +func (r *realGraceExpectations) resetExpectations() { + r.Lock() + defer r.Unlock() + + for controllerKey := range r.controllerCache { + delete(r.controllerCache, controllerKey) + } +} diff --git a/pkg/util/grace/grace_expectations_test.go b/pkg/util/grace/grace_expectations_test.go new file mode 100644 index 00000000..6efbb8f2 --- /dev/null +++ b/pkg/util/grace/grace_expectations_test.go @@ -0,0 +1,209 @@ +package grace + +import ( + "testing" + "time" +) + +func TestGetExpectations(t *testing.T) { + r := NewGraceExpectations() + + now := time.Now() + r.controllerCache["testKey"] = timeCache{ + Create: &now, + } + + result := r.GetExpectations("testKey") + if result == nil || len(result) != 1 || result[Create] == nil { + t.Errorf("expected timeCache with one Create action, got %v", result) + } +} + +func TestExpect(t *testing.T) { + r := NewGraceExpectations() + r.Expect("testKey", Create) + + if _, exists := r.controllerCache["testKey"][Create]; !exists { + t.Errorf("expected Create action for testKey to be recorded") + } +} + +func TestObserve(t *testing.T) { + r := NewGraceExpectations() + + r.Expect("testKey", Create) + r.Observe("testKey", Create) + + if _, exists := r.controllerCache["testKey"]; exists { + t.Errorf("expected testKey to be removed from cache after Observe") + } +} + +func TestSatisfiedExpectations(t *testing.T) { + r := NewGraceExpectations() + r.Expect("testKey", Create) + + // Should be unsatisfied if graceSeconds is 0 (immediate timeout) + satisfied, _ := r.SatisfiedExpectations("testKey", Create, 0) + if !satisfied { + t.Errorf("expected expectations to be satisfied immediately for 0 graceSeconds") + } + + // Set a new expectation with some future grace period + r.Expect("testKey", Create) + satisfied, _ = r.SatisfiedExpectations("testKey", Create, 60) + if satisfied { + t.Errorf("expected expectations to be unsatisfied for 60 second grace period") + } +} + +func TestDeleteExpectations(t *testing.T) { + r := NewGraceExpectations() + r.Expect("testKey", Create) + + r.DeleteExpectations("testKey") + + if _, exists := r.controllerCache["testKey"]; exists { + t.Errorf("expected testKey to be deleted from cache after DeleteExpectations") + } +} + +func TestCleanOutdatedItems(t *testing.T) { + r := NewGraceExpectations() + + // Set an expectation well in the past so it is outdated + past := time.Now().Add(-time.Hour) + r.controllerCache["testKey"] = timeCache{ + Create: &past, + } + + r.CleanOutdatedItems(ExpectationGraceTimeout) + + if _, exists := r.controllerCache["testKey"]; exists { + t.Errorf("expected testKey to be removed by CleanOutdatedItems") + } + + // Set a recent expectation to ensure it is not cleaned out + r.Expect("testKey", Create) + recent := time.Now() + r.controllerCache["testKey"][Create] = &recent + + r.CleanOutdatedItems(ExpectationGraceTimeout) + + if _, exists := r.controllerCache["testKey"]; !exists { + t.Errorf("expected testKey to not be removed by CleanOutdatedItems") + } +} + +func TestResetExpectations(t *testing.T) { + r := NewGraceExpectations() + r.Expect("testKey", Create) + + r.resetExpectations() + + if _, exists := r.controllerCache["testKey"]; exists { + t.Errorf("expected controller cache to be empty after resetExpectations") + } +} + +func TestComprehensive(t *testing.T) { + // Initialize realGraceExpectations + r := NewGraceExpectations() + + // Add expectations + r.Expect("testController1", Create) + r.Expect("testController1", Delete) + r.Expect("testController2", Update) + + // Validate that expectations are correctly added + if expectations := r.GetExpectations("testController1"); len(expectations) != 2 { + t.Errorf("expected 2 actions for testController1, got %d", len(expectations)) + } + if expectations := r.GetExpectations("testController2"); len(expectations) != 1 { + t.Errorf("expected 1 action for testController2, got %d", len(expectations)) + } + + // Observe and remove a specific action + r.Observe("testController1", Create) + if expectations := r.GetExpectations("testController1"); len(expectations) != 1 { + t.Errorf("expected 1 action for testController1 after observation, got %d", len(expectations)) + } + + // Check satisfaction status for an existing expectation + satisfied, remaining := r.SatisfiedExpectations("testController1", Delete, 0) + if !satisfied || remaining > 0 { + t.Errorf("expected unsatisfied expectation for testController1 Delete action") + } + + // Check satisfaction status for a non-existing action + satisfied, _ = r.SatisfiedExpectations("testController1", Create, 0) + if !satisfied { + t.Errorf("expected satisfied status for non-existing Create action on testController1") + } + + // Advance the time and check satisfaction after timeout + r.Expect("testController3", Restore) + time.Sleep(2 * time.Second) + satisfied, _ = r.SatisfiedExpectations("testController3", Restore, 1) // 1 second grace period + if !satisfied { + t.Errorf("expected satisfied status for Restore action on testController3 after timeout") + } + + // Delete expectations for a controller key + r.DeleteExpectations("testController1") + if expectations := r.GetExpectations("testController1"); len(expectations) != 0 { + t.Errorf("expected no actions for testController1 after deletion") + } + + // Clean outdated items + r.Expect("testController4", Update) + outdated := time.Now().Add(-10 * time.Minute) // Set a past record time to be outdated + r.controllerCache["testController4"][Update] = &outdated + r.CleanOutdatedItems(ExpectationGraceTimeout) + if expectations := r.GetExpectations("testController4"); len(expectations) != 0 { + t.Errorf("expected no actions for testController4 after clean") + } + + // Reset expectations + r.Expect("testController5", Delete) + r.resetExpectations() + if len(r.controllerCache) != 0 { + t.Errorf("expected empty controller cache after reset") + } +} + +func TestStartCleaner(t *testing.T) { + // Shorten the interval for faster testing + testInterval := time.Millisecond * 100 + + // Initialize realGraceExpectations + r := NewGraceExpectations() + + // Add mixed expectations + past := time.Now().Add(-10 * time.Minute) + recent := time.Now().Add(testInterval * 3) + + r.controllerCache["testController1"] = timeCache{ + Create: &past, + Update: &recent, + } + r.controllerCache["testController2"] = timeCache{ + Delete: &past, + Restore: &recent, + } + + // Start the cleaner with a short interval + r.StartCleaner(testInterval) + + // Wait for the cleaner to run + time.Sleep(testInterval * 3) + + // Verify the outdated items have been cleaned and recent ones remain + if len(r.controllerCache["testController1"]) != 1 || r.controllerCache["testController1"][Update] != &recent { + t.Errorf("expected only recent Update action for testController1 to remain, found %d", len(r.controllerCache["testController1"])) + } + + if len(r.controllerCache["testController2"]) != 1 || r.controllerCache["testController2"][Restore] != &recent { + t.Errorf("expected only recent Restore action for testController2 to remain, found %d", len(r.controllerCache["testController2"])) + } +} diff --git a/pkg/util/grace/grace_wrapper.go b/pkg/util/grace/grace_wrapper.go new file mode 100644 index 00000000..67d90800 --- /dev/null +++ b/pkg/util/grace/grace_wrapper.go @@ -0,0 +1,65 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package grace + +import ( + "time" + + "k8s.io/klog/v2" +) + +func RunWithGraceSeconds(key, action string, graceSeconds int32, f func() (bool, error)) (bool, time.Duration, error) { + return runWithGraceSeconds(DefaultGraceExpectations, key, action, graceSeconds, f) +} + +// Returns: +// - error: If the passed function itself returns an error, the error is directly returned. +// - bool: Tells the caller whether a retry is needed. +// - time.Duration: The remaining time to wait. (only valid when error is nil) +// The passed function `f` needs to be idempotent. +// - The bool value returned by `f` indicates whether the related resources were updated in this call. +// - If resources were updated, we need to wait for `graceSeconds`. +func runWithGraceSeconds(e *realGraceExpectations, key, action string, graceSeconds int32, f func() (bool, error)) (bool, time.Duration, error) { + modified, err := f() + if err != nil { + return true, 0, err + } + // if user specify 0, it means no need to wait + if graceSeconds == 0 { + e.Observe(key, Action(action)) + return false, 0, nil + } + // if f return true, it means some resources are modified by f in this call + // we need to wait a grace period + if modified { + e.Expect(key, Action(action)) + klog.Infof("function return modified, expectation created, key: %s, action: %s, expect to wait %d seconds", key, action, graceSeconds) + return true, time.Duration(graceSeconds) * time.Second, nil + } + if satisfied, remaining := e.SatisfiedExpectations(key, Action(action), graceSeconds); !satisfied { + klog.Infof("expectation unsatisfied, key: %s, action: %s, remaining/graceSeconds: %.1f/%d", key, action, remaining.Seconds(), graceSeconds) + return true, remaining, nil + } + e.Observe(key, Action(action)) + klog.Infof("expectation satisfied, key: %s, action: %s", key, action) + return false, 0, nil +} + +// warning: only used for test +func ResetExpectations() { + DefaultGraceExpectations.resetExpectations() +} diff --git a/pkg/util/grace/grace_wrapper_test.go b/pkg/util/grace/grace_wrapper_test.go new file mode 100644 index 00000000..474b0d08 --- /dev/null +++ b/pkg/util/grace/grace_wrapper_test.go @@ -0,0 +1,132 @@ +package grace + +import ( + "errors" + "testing" + "time" +) + +func TestRunWithGraceSeconds(t *testing.T) { + tests := []struct { + name string + graceSeconds int32 + modified bool + err error + expectedRetry bool + expectedErr error + }{ + {name: "No modification, no grace period", graceSeconds: 0, modified: false, err: nil, expectedRetry: false, expectedErr: nil}, + {name: "Modification, with grace period", graceSeconds: 10, modified: true, err: nil, expectedRetry: true, expectedErr: nil}, + {name: "No modification, expectation unsatisfied", graceSeconds: 10, modified: false, err: nil, expectedRetry: true, expectedErr: nil}, + {name: "Function returns error", graceSeconds: 10, modified: false, err: errors.New("test error"), expectedRetry: true, expectedErr: errors.New("test error")}, + } + + graceExpectations := NewGraceExpectations() + + for _, cs := range tests { + t.Run(cs.name, func(t *testing.T) { + f := func() (bool, error) { + return cs.modified, cs.err + } + + retry, _, err := runWithGraceSeconds(graceExpectations, "testKey", "create", cs.graceSeconds, f) + if retry != cs.expectedRetry { + t.Errorf("expected retry: %v, got: %v", cs.expectedRetry, retry) + } + + if !equalErr(err, cs.expectedErr) { + t.Errorf("expected error: %v, got: %v", cs.expectedErr, err) + } + }) + } + + // Additional test to verify the timeout behavior + t.Run("Satisfaction of expectation over time", func(t *testing.T) { + graceSeconds := int32(1) // 1 second grace period + graceExpectations := NewGraceExpectations() + + f := func() (bool, error) { + return true, nil + } + + retry, _, err := runWithGraceSeconds(graceExpectations, "testKey2", "delete", graceSeconds, f) + if retry != true { + t.Errorf("expected retry: true after modification, got: %v", retry) + } + if err != nil { + t.Errorf("expected no error, got: %v", err) + } + + // Wait for the grace period to be satisfied + time.Sleep(time.Duration(graceSeconds) * time.Second) + + f2 := func() (bool, error) { + return false, nil + } + retry, _, err = runWithGraceSeconds(graceExpectations, "testKey2", "delete", graceSeconds, f2) + if retry != false { + t.Errorf("expected retry: false after grace period satisfied, got: %v", retry) + } + if err != nil { + t.Errorf("expected no error, got: %v", err) + } + }) +} + +// Comprehensive test for RunWithGraceSeconds using testFunction +func TestRunWithGraceSecondsComprehensive(t *testing.T) { + graceExpectations := NewGraceExpectations() + + tests := []struct { + name string + expectedState int + graceSeconds int32 + expectedRetry bool + expectedErr error + }{ + {"No modification needed", 0, 0, false, nil}, + {"Initial modification, create expectation", 1, 10, true, nil}, + {"Function errors", -1, 10, true, errors.New("test error")}, + } + + for _, cs := range tests { + t.Run(cs.name, func(t *testing.T) { + f := testFunction(cs.expectedState) + + retry, _, err := runWithGraceSeconds(graceExpectations, cs.name, "create", cs.graceSeconds, f) + if !equalErr(err, cs.expectedErr) { + t.Errorf("%s: expected error: %v, but got none", cs.name, cs.expectedErr) + } + + if retry != cs.expectedRetry { + t.Errorf("%s: expected retry: %v, got: %v", cs.name, cs.expectedRetry, retry) + } + }) + } +} + +// Test function generator +func testFunction(expectedState int) func() (bool, error) { + currentState := 0 + return func() (bool, error) { + if expectedState < 0 { + return true, errors.New("test error") + } + if currentState == expectedState { + return false, nil + } else { + currentState++ + return true, nil + } + } +} + +func equalErr(err1, err2 error) bool { + if err1 == nil && err2 == nil { + return true + } + if err1 != nil && err2 != nil { + return true + } + return false +} diff --git a/pkg/util/rollout_utils.go b/pkg/util/rollout_utils.go index 4bfc09e7..33a2b0d9 100644 --- a/pkg/util/rollout_utils.go +++ b/pkg/util/rollout_utils.go @@ -190,13 +190,3 @@ func CheckNextBatchIndexWithCorrect(rollout *rolloutv1beta1.Rollout) { } } } - -func GracePeriodSecondsOrDefault(refs []rolloutv1beta1.TrafficRoutingRef, defaultSeconds int32) int32 { - if len(refs) == 0 { - return defaultSeconds - } - if refs[0].GracePeriodSeconds < 0 { - return defaultSeconds - } - return refs[0].GracePeriodSeconds -}