From a3f27825190964b1407db6eab6bebf574a5e3474 Mon Sep 17 00:00:00 2001 From: dbug-dk <402230675@qq.com> Date: Thu, 14 Mar 2024 15:15:38 +0800 Subject: [PATCH 1/7] 1. refactor podUpdater interface. 2. fix bug: replace pod by to-replace label, not check new pod status when pod upgrade policy is not replace update --- .../collaset/collaset_controller_test.go | 33 +- .../collaset/synccontrol/sync_control.go | 190 +-------- .../collaset/synccontrol/update.go | 361 +++++++++++++++++- 3 files changed, 389 insertions(+), 195 deletions(-) diff --git a/pkg/controllers/collaset/collaset_controller_test.go b/pkg/controllers/collaset/collaset_controller_test.go index 1112799a..1ff9b674 100644 --- a/pkg/controllers/collaset/collaset_controller_test.go +++ b/pkg/controllers/collaset/collaset_controller_test.go @@ -615,7 +615,7 @@ var _ = Describe("collaset controller", func() { } }) - It("replace update reconcile by partition", func() { + It("[replace update] reconcile by partition", func() { testcase := "test-replace-update-by-partition" Expect(createNamespace(c, testcase)).Should(BeNil()) @@ -750,7 +750,7 @@ var _ = Describe("collaset controller", func() { Expect(inDeleteReplicas).Should(BeEquivalentTo(4)) }) - It("replace update reconcile by label", func() { + It("[replace update] reconcile by label", func() { testcase := "test-replace-update-by-label" Expect(createNamespace(c, testcase)).Should(BeNil()) @@ -986,7 +986,7 @@ var _ = Describe("collaset controller", func() { // double check updated pod replicas Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil()) - var replacePairNewId, newPodInstanceId string + var replacePairNewId, newPodInstanceId, newCreatePodName string for _, pod := range podList.Items { Expect(pod.Labels).ShouldNot(BeNil()) // replace by current revision @@ -996,6 +996,7 @@ var _ = Describe("collaset controller", func() { replacePairNewId = pod.Labels[appsv1alpha1.PodReplacePairNewId] Expect(replacePairNewId).ShouldNot(BeNil()) } else { + newCreatePodName = pod.Name newPodInstanceId = pod.Labels[appsv1alpha1.PodInstanceIDLabelKey] Expect(pod.Labels[appsv1alpha1.PodReplacePairOriginName]).Should(BeEquivalentTo(replacePod.Name)) } @@ -1003,6 +1004,32 @@ var _ = Describe("collaset controller", func() { Expect(replacePairNewId).ShouldNot(BeEquivalentTo("")) Expect(newPodInstanceId).ShouldNot(BeEquivalentTo("")) Expect(newPodInstanceId).Should(BeEquivalentTo(replacePairNewId)) + Expect(newCreatePodName).ShouldNot(BeEquivalentTo("")) + + Expect(updatePodWithRetry(c, replacePod.Namespace, newCreatePodName, func(pod *corev1.Pod) bool { + if pod.Labels == nil { + pod.Labels = map[string]string{} + } + pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true" + return true + })).Should(BeNil()) + + Eventually(func() error { + // check updated pod replicas by CollaSet status + return expectedStatusReplicas(c, cs, 0, 0, 1, 2, 0, 0, 0, 0) + }, 30*time.Second, 1*time.Second).Should(BeNil()) + + Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil()) + for _, pod := range podList.Items { + Expect(pod.Labels).ShouldNot(BeNil()) + // replace by current revision + Expect(pod.Labels[appsv1.ControllerRevisionHashLabelKey]).Should(BeEquivalentTo(cs.Status.CurrentRevision)) + Expect(pod.Spec.Containers[0].Image).Should(BeEquivalentTo("nginx:v1")) + if pod.Name == replacePod.Name { + _, exist := pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey] + Expect(exist).Should(BeTrue()) + } + } }) It("replace update change to inplaceUpdate", func() { diff --git a/pkg/controllers/collaset/synccontrol/sync_control.go b/pkg/controllers/collaset/synccontrol/sync_control.go index 362e4944..e67107a7 100644 --- a/pkg/controllers/collaset/synccontrol/sync_control.go +++ b/pkg/controllers/collaset/synccontrol/sync_control.go @@ -339,10 +339,15 @@ func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needR needCleanLabels = []string{appsv1alpha1.PodReplaceIndicationLabelKey, appsv1alpha1.PodReplaceByReplaceUpdateLabelKey} } - // pod is replace new created pod, skip replace if originPodName, exist := pod.Labels[appsv1alpha1.PodReplacePairOriginName]; exist { - if _, exist := podNameMap[originPodName]; !exist { + // replace pair origin pod is not exist, clean label. + if originPod, exist := podNameMap[originPodName]; !exist { needCleanLabels = append(needCleanLabels, appsv1alpha1.PodReplacePairOriginName) + } else if !replaceByUpdate { + // not replace update, delete origin pod when new created pod is service available + if _, serviceAvailable := pod.Labels[appsv1alpha1.PodServiceAvailableLabel]; serviceAvailable { + needDeletePods = append(needDeletePods, originPod) + } } } @@ -663,124 +668,17 @@ func (r *RealSyncControl) Update( // 2. decide Pod update candidates podToUpdate := decidePodToUpdate(cls, podUpdateInfos) podCh := make(chan *PodUpdateInfo, len(podToUpdate)) - updater := newPodUpdater(ctx, r.client, cls) - updating := false - analysedPod := sets.NewString() - - if cls.Spec.UpdateStrategy.PodUpdatePolicy != appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType { - // 3. prepare Pods to begin PodOpsLifecycle - for i, podInfo := range podToUpdate { - if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged { - continue - } - - if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) { - continue - } - podCh <- podToUpdate[i] - } - - // 4. begin podOpsLifecycle parallel - - succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error { - podInfo := <-podCh - // fulfill Pod update information - if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil { - return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err) - } - analysedPod.Insert(podInfo.Name) - logger.V(1).Info("try to begin PodOpsLifecycle for updating Pod of CollaSet", "pod", commonutils.ObjectKeyString(podInfo.Pod)) - if updated, err := podopslifecycle.Begin(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) { - if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport { - return podopslifecycle.WhenBeginDelete(obj) - } - return false, nil - }); err != nil { - return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) - } else if updated { - // add an expectation for this pod update, before next reconciling - if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { - return err - } - } - - return nil - }) - - updating = updating || succCount > 0 - if err != nil { - collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error()) - return updating, nil, err - } else { - collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "") - } - - needUpdateContext := false - for i := range podToUpdate { - podInfo := podToUpdate[i] - requeueAfter, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, realValue(cls.Spec.UpdateStrategy.OperationDelaySeconds), podInfo.Pod) - if !allowed { - r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod %s is not allowed to update", commonutils.ObjectKeyString(podInfo.Pod)) - continue - } - if requeueAfter != nil { - r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "delay Pod update for %d seconds", requeueAfter.Seconds()) - if recordedRequeueAfter == nil || *requeueAfter < *recordedRequeueAfter { - recordedRequeueAfter = requeueAfter - } - continue - } - - if !ownedIDs[podInfo.ID].Contains(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) { - needUpdateContext = true - ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) - } - if podInfo.PodDecorationChanged { - decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations) - if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr { - needUpdateContext = true - ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr) - } - } + updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder) - if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged { - continue - } - // if Pod has not been updated, update it. - podCh <- podToUpdate[i] - } - - // 5. mark Pod to use updated revision before updating it. - if needUpdateContext { - logger.V(1).Info("try to update ResourceContext for CollaSet") - err = retry.RetryOnConflict(retry.DefaultRetry, func() error { - return podcontext.UpdateToPodContext(r.client, cls, ownedIDs) - }) - - if err != nil { - collasetutils.AddOrUpdateCondition(resources.NewStatus, - appsv1alpha1.CollaSetScale, err, "UpdateFailed", - fmt.Sprintf("fail to update Context for updating: %s", err)) - return updating, recordedRequeueAfter, err - } else { - collasetutils.AddOrUpdateCondition(resources.NewStatus, - appsv1alpha1.CollaSetScale, nil, "UpdateFailed", "") - } - } - } else { - for i, podInfo := range podToUpdate { - // The pod is in a "replaceUpdate" state, always requires further update processing. - if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged { - continue - } - podCh <- podToUpdate[i] - } + updating, recordedRequeueAfter, err := updater.PrepareAndFilterPodUpdate(podToUpdate, resources, ownedIDs, podCh) + if err != nil { + return updating, recordedRequeueAfter, err } // 6. update Pod succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, _ error) error { podInfo := <-podCh - if !analysedPod.Has(podInfo.Name) { + if !podInfo.analysed { if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil { return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err) } @@ -793,41 +691,9 @@ func (r *RealSyncControl) Update( "inPlaceUpdate", podInfo.InPlaceUpdateSupport, "onlyMetadataChanged", podInfo.OnlyMetadataChanged, ) - if podInfo.OnlyMetadataChanged || podInfo.InPlaceUpdateSupport { - // 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly - if err = r.podControl.UpdatePod(podInfo.UpdatedPod); err != nil { - return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err) - } else { - podInfo.Pod = podInfo.UpdatedPod - r.recorder.Eventf(podInfo.Pod, - corev1.EventTypeNormal, - "UpdatePod", - "succeed to update Pod %s/%s to from revision %s to revision %s by in-place", - podInfo.Namespace, podInfo.Name, - podInfo.CurrentRevision.Name, - resources.UpdatedRevision.Name) - if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.UpdatedPod.ResourceVersion); err != nil { - return err - } - } - } else if cls.Spec.UpdateStrategy.PodUpdatePolicy == appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType { - return nil - } else { - // 6.2 if pod has changes not in-place supported, recreate it - if err = r.podControl.DeletePod(podInfo.Pod); err != nil { - return fmt.Errorf("fail to delete Pod %s/%s when updating by recreate: %s", podInfo.Namespace, podInfo.Name, err) - } else { - r.recorder.Eventf(podInfo.Pod, - corev1.EventTypeNormal, - "UpdatePod", - "succeed to update Pod %s/%s to from revision %s to revision %s by recreate", - podInfo.Namespace, - podInfo.Name, - podInfo.CurrentRevision.Name, resources.UpdatedRevision.Name) - if err := collasetutils.ActiveExpectations.ExpectDelete(cls, expectations.Pod, podInfo.Name); err != nil { - return err - } - } + + if err = updater.UpgradePod(podInfo); err != nil { + return err } return nil @@ -856,29 +722,9 @@ func (r *RealSyncControl) Update( } if finished { - if podInfo.isInReplacing { - replacePairNewPodInfo := podInfo.replacePairNewPodInfo - if replacePairNewPodInfo != nil { - if _, exist := replacePairNewPodInfo.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]; !exist { - patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano()))) - if err = r.podControl.PatchPod(podInfo.Pod, patch); err != nil { - return fmt.Errorf("failed to delete replace pair origin pod %s/%s %s", podInfo.Namespace, podInfo.replacePairNewPodInfo.Name, err) - } - } - } - } else { - logger.V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod)) - if updated, err := podopslifecycle.Finish(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil { - return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) - } else if updated { - // add an expectation for this pod update, before next reconciling - if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { - return err - } - r.recorder.Eventf(podInfo.Pod, - corev1.EventTypeNormal, - "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name) - } + err := updater.FinishUpdatePod(podInfo) + if err != nil { + return err } } else { r.recorder.Eventf(podInfo.Pod, diff --git a/pkg/controllers/collaset/synccontrol/update.go b/pkg/controllers/collaset/synccontrol/update.go index 2581ed78..89ea1693 100644 --- a/pkg/controllers/collaset/synccontrol/update.go +++ b/pkg/controllers/collaset/synccontrol/update.go @@ -29,14 +29,20 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/retry" "sigs.k8s.io/controller-runtime/pkg/client" appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1" + "kusionstack.io/operating/pkg/controllers/collaset/podcontext" + "kusionstack.io/operating/pkg/controllers/collaset/podcontrol" "kusionstack.io/operating/pkg/controllers/collaset/utils" collasetutils "kusionstack.io/operating/pkg/controllers/collaset/utils" controllerutils "kusionstack.io/operating/pkg/controllers/utils" + "kusionstack.io/operating/pkg/controllers/utils/expectations" utilspoddecoration "kusionstack.io/operating/pkg/controllers/utils/poddecoration" "kusionstack.io/operating/pkg/controllers/utils/podopslifecycle" + commonutils "kusionstack.io/operating/pkg/utils" ) type PodUpdateInfo struct { @@ -51,6 +57,8 @@ type PodUpdateInfo struct { IsUpdatedRevision bool // carry the pod's current revision CurrentRevision *appsv1.ControllerRevision + // carry the desired update revision + UpdateRevision *appsv1.ControllerRevision // indicates effected PodDecorations changed PodDecorationChanged bool @@ -70,6 +78,9 @@ type PodUpdateInfo struct { // replace origin pod replacePairOriginPodName string + + // after fulfillPodUpdatedInfo, set value as true. to avoid fulfillPodUpdatedInfo multiple times + analysed bool } func attachPodUpdateInfo(ctx context.Context, pods []*collasetutils.PodWrapper, resource *collasetutils.RelatedResources) ([]*PodUpdateInfo, error) { @@ -105,6 +116,7 @@ func attachPodUpdateInfo(ctx context.Context, pods []*collasetutils.PodWrapper, updateInfo.CurrentPodDecorations = currentPDs updateInfo.UpdatedPodDecorations = updatedPDs + updateInfo.UpdateRevision = resource.UpdatedRevision // decide this pod current revision, or nil if not indicated if pod.Labels != nil { currentRevisionName, exist := pod.Labels[appsv1.ControllerRevisionHashLabelKey] @@ -253,23 +265,26 @@ func (o orderByDefault) Less(i, j int) bool { } type PodUpdater interface { + PrepareAndFilterPodUpdate(podToUpdate []*PodUpdateInfo, resources *collasetutils.RelatedResources, + ownedIDs map[int]*appsv1alpha1.ContextDetail, podCh chan *PodUpdateInfo) (bool, *time.Duration, error) FulfillPodUpdatedInfo(revision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) error + UpgradePod(podInfo *PodUpdateInfo) error GetPodUpdateFinishStatus(podUpdateInfo *PodUpdateInfo) (bool, string, error) + FinishUpdatePod(podInfo *PodUpdateInfo) error } -func newPodUpdater(ctx context.Context, client client.Client, cls *appsv1alpha1.CollaSet) PodUpdater { +func newPodUpdater(ctx context.Context, client client.Client, cls *appsv1alpha1.CollaSet, podControl podcontrol.Interface, recorder record.EventRecorder) PodUpdater { switch cls.Spec.UpdateStrategy.PodUpdatePolicy { case appsv1alpha1.CollaSetRecreatePodUpdateStrategyType: - // TODO: recreatePodUpdater - return &recreatePodUpdater{} + return &recreatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} case appsv1alpha1.CollaSetInPlaceOnlyPodUpdateStrategyType: // In case of using native K8s, Pod is only allowed to update with container image, so InPlaceOnly policy is // implemented with InPlaceIfPossible policy as default for compatibility. - return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client} + return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} case appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType: - return &replaceUpdatePodUpdater{collaSet: cls, ctx: ctx, Client: client} + return &replaceUpdatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} default: - return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client} + return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} } } @@ -283,15 +298,69 @@ type ContainerStatus struct { } type inPlaceIfPossibleUpdater struct { - collaSet *appsv1alpha1.CollaSet - ctx context.Context + collaSet *appsv1alpha1.CollaSet + ctx context.Context + podControl podcontrol.Interface + recorder record.EventRecorder client.Client } +func (u *inPlaceIfPossibleUpdater) PrepareAndFilterPodUpdate(podToUpdate []*PodUpdateInfo, resources *collasetutils.RelatedResources, ownedIDs map[int]*appsv1alpha1.ContextDetail, podCh chan *PodUpdateInfo) (bool, *time.Duration, error) { + // 1. prepare Pods to begin PodOpsLifecycle, filter updated revision pods or is already during pods + filterPods(podToUpdate, podCh) + // 2. begin podOpsLifecycle parallel + var recordedRequeueAfter *time.Duration + succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error { + podInfo := <-podCh + // fulfill Pod update information + if err := u.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil { + return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err) + } + + u.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "PodUpdateLifecycle", "try to begin PodOpsLifecycle for updating Pod of CollaSet") + if updated, err := podopslifecycle.Begin(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) { + if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport { + return podopslifecycle.WhenBeginDelete(obj) + } + return false, nil + }); err != nil { + return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) + } else if updated { + // add an expectation for this pod update, before next reconciling + if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { + return err + } + } + + return nil + }) + + updating := succCount > 0 + if err != nil { + collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error()) + return updating, nil, err + } else { + collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "") + } + // 3. judge whether it need to update podContext + recordedRequeueAfter, err = filterPodsAndUpdatePodContext(u.collaSet, podToUpdate, u.recorder, ownedIDs, resources, podCh, u.Client) + if err != nil { + collasetutils.AddOrUpdateCondition(resources.NewStatus, + appsv1alpha1.CollaSetScale, err, "UpdateFailed", + fmt.Sprintf("fail to update Context for updating: %s", err)) + return updating, recordedRequeueAfter, err + } else { + collasetutils.AddOrUpdateCondition(resources.NewStatus, + appsv1alpha1.CollaSetScale, nil, "UpdateFailed", "") + } + return updating, recordedRequeueAfter, nil +} + func (u *inPlaceIfPossibleUpdater) FulfillPodUpdatedInfo( updatedRevision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) error { + podUpdateInfo.analysed = true // 1. build pod from current and updated revision ownerRef := metav1.NewControllerRef(u.collaSet, appsv1alpha1.GroupVersion.WithKind("CollaSet")) // TODO: use cache @@ -363,6 +432,50 @@ func (u *inPlaceIfPossibleUpdater) FulfillPodUpdatedInfo( return nil } +func (u *inPlaceIfPossibleUpdater) UpgradePod(podInfo *PodUpdateInfo) error { + if podInfo.OnlyMetadataChanged || podInfo.InPlaceUpdateSupport { + // 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly + if err := u.podControl.UpdatePod(podInfo.UpdatedPod); err != nil { + return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err) + } else { + podInfo.Pod = podInfo.UpdatedPod + u.recorder.Eventf(podInfo.Pod, + corev1.EventTypeNormal, + "UpdatePod", + "succeed to update Pod %s/%s to from revision %s to revision %s by in-place", + podInfo.Namespace, podInfo.Name, + podInfo.CurrentRevision.Name, + podInfo.UpdateRevision.Name) + if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.UpdatedPod.ResourceVersion); err != nil { + return err + } + } + } else { + // 6.2 if pod has changes not in-place supported, recreate it + return recreatePod(u.collaSet, podInfo, u.podControl, u.recorder) + } + return nil +} + +func recreatePod(collaSet *appsv1alpha1.CollaSet, podInfo *PodUpdateInfo, podControl podcontrol.Interface, recorder record.EventRecorder) error { + if err := podControl.DeletePod(podInfo.Pod); err != nil { + return fmt.Errorf("fail to delete Pod %s/%s when updating by recreate: %s", podInfo.Namespace, podInfo.Name, err) + } + recorder.Eventf(podInfo.Pod, + corev1.EventTypeNormal, + "UpdatePod", + "succeed to update Pod %s/%s to from revision %s to revision %s by recreate", + podInfo.Namespace, + podInfo.Name, + podInfo.CurrentRevision.Name, + podInfo.UpdateRevision.Name) + if err := collasetutils.ActiveExpectations.ExpectDelete(collaSet, expectations.Pod, podInfo.Name); err != nil { + return err + } + + return nil +} + func (u inPlaceIfPossibleUpdater) diffPod(currentPod, updatedPod *corev1.Pod) (inPlaceSetUpdateSupport bool, onlyMetadataChanged bool) { if len(currentPod.Spec.Containers) != len(updatedPod.Spec.Containers) { return false, false @@ -459,12 +572,35 @@ func (u *inPlaceIfPossibleUpdater) GetPodUpdateFinishStatus(podUpdateInfo *PodUp return true, "", nil } +func (u *inPlaceIfPossibleUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error { + //u.recorder.Eventf().V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod)) + if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil { + return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) + } else if updated { + // add an expectation for this pod update, before next reconciling + if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { + return err + } + u.recorder.Eventf(podInfo.Pod, + corev1.EventTypeNormal, + "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name) + } + return nil +} + // TODO type inPlaceOnlyPodUpdater struct { } +func (u *inPlaceOnlyPodUpdater) PrepareAndFilterPodUpdate(podToUpdate []*PodUpdateInfo, resources *collasetutils.RelatedResources, ownedIDs map[int]*appsv1alpha1.ContextDetail, podCh chan *PodUpdateInfo) (bool, *time.Duration, error) { + return false, nil, nil +} + func (u *inPlaceOnlyPodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) (inPlaceUpdateSupport bool, onlyMetadataChanged bool, updatedPod *corev1.Pod, err error) { + return +} +func (u *inPlaceOnlyPodUpdater) UpgradePod(_ *appsv1alpha1.CollaSet, _ *PodUpdateInfo) (err error) { return } @@ -473,52 +609,159 @@ func (u *inPlaceOnlyPodUpdater) GetPodUpdateFinishStatus(_ *PodUpdateInfo) (fini } type recreatePodUpdater struct { + collaSet *appsv1alpha1.CollaSet + ctx context.Context + podControl podcontrol.Interface + recorder record.EventRecorder + client.Client +} + +func (u *recreatePodUpdater) PrepareAndFilterPodUpdate(podToUpdate []*PodUpdateInfo, resources *collasetutils.RelatedResources, ownedIDs map[int]*appsv1alpha1.ContextDetail, podCh chan *PodUpdateInfo) (bool, *time.Duration, error) { + // 1. prepare Pods to begin PodOpsLifecycle, filter updated revision pods or is already during pods + filterPods(podToUpdate, podCh) + // 2. begin podOpsLifecycle parallel + var recordedRequeueAfter *time.Duration + succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error { + podInfo := <-podCh + // fulfill Pod update information + if err := u.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil { + return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err) + } + + //logger.V(1).Info("try to begin PodOpsLifecycle for updating Pod of CollaSet", "pod", commonutils.ObjectKeyString(podInfo.Pod)) + if updated, err := podopslifecycle.Begin(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) { + if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport { + return podopslifecycle.WhenBeginDelete(obj) + } + return false, nil + }); err != nil { + return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) + } else if updated { + // add an expectation for this pod update, before next reconciling + if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { + return err + } + } + + return nil + }) + + updating := succCount > 0 + if err != nil { + collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error()) + return updating, nil, err + } else { + collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "") + } + // 3. judge whether it need to update podContext + recordedRequeueAfter, err = filterPodsAndUpdatePodContext(u.collaSet, podToUpdate, u.recorder, ownedIDs, resources, podCh, u.Client) + if err != nil { + collasetutils.AddOrUpdateCondition(resources.NewStatus, + appsv1alpha1.CollaSetScale, err, "UpdateFailed", + fmt.Sprintf("fail to update Context for updating: %s", err)) + return updating, recordedRequeueAfter, err + } else { + collasetutils.AddOrUpdateCondition(resources.NewStatus, + appsv1alpha1.CollaSetScale, nil, "UpdateFailed", "") + } + return updating, nil, nil } func (u *recreatePodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) error { return nil } +func (u *recreatePodUpdater) UpgradePod(podInfo *PodUpdateInfo) error { + return recreatePod(u.collaSet, podInfo, u.podControl, u.recorder) +} + func (u *recreatePodUpdater) GetPodUpdateFinishStatus(podInfo *PodUpdateInfo) (finished bool, msg string, err error) { // Recreate policy alway treat Pod as update finished return podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged, "", nil } +func (u *recreatePodUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error { + //u.recorder.Eventf().V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod)) + if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil { + return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) + } else if updated { + // add an expectation for this pod update, before next reconciling + if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { + return err + } + u.recorder.Eventf(podInfo.Pod, + corev1.EventTypeNormal, + "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name) + } + return nil +} + type replaceUpdatePodUpdater struct { - collaSet *appsv1alpha1.CollaSet - ctx context.Context + collaSet *appsv1alpha1.CollaSet + ctx context.Context + podControl podcontrol.Interface + recorder record.EventRecorder client.Client } +func (u *replaceUpdatePodUpdater) PrepareAndFilterPodUpdate(podToUpdate []*PodUpdateInfo, _ *collasetutils.RelatedResources, _ map[int]*appsv1alpha1.ContextDetail, podCh chan *PodUpdateInfo) (bool, *time.Duration, error) { + // The pod is in a "replaceUpdate" state, only filter these pods that already updated revision. + for i, podInfo := range podToUpdate { + if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged { + continue + } + podCh <- podToUpdate[i] + } + return false, nil, nil +} + func (u *replaceUpdatePodUpdater) FulfillPodUpdatedInfo(updatedRevision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) (err error) { // when replaceUpdate, inPlaceUpdateSupport and onlyMetadataChanged always false - - // 1. judge replace pair new pod is updated revision, if not, delete. + // judge replace pair new pod is updated revision, if not, delete. + podUpdateInfo.analysed = true if podUpdateInfo.replacePairNewPodInfo != nil { - newPodRevision, exist := podUpdateInfo.replacePairNewPodInfo.Pod.Labels[appsv1.ControllerRevisionHashLabelKey] + replacePairNewPod := podUpdateInfo.replacePairNewPodInfo.Pod + newPodRevision, exist := replacePairNewPod.Labels[appsv1.ControllerRevisionHashLabelKey] if exist && newPodRevision == updatedRevision.Name { return } + u.recorder.Eventf(podUpdateInfo.Pod, + corev1.EventTypeNormal, + "ReplaceUpdatePod", + "label to-delete on new pair pod %s/%s because it is not updated revision, current revision: %s, updated revision: %s", + replacePairNewPod.Namespace, + replacePairNewPod.Name, + newPodRevision, + updatedRevision.Name) patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano()))) if err = u.Patch(u.ctx, podUpdateInfo.replacePairNewPodInfo.Pod, patch); err != nil { err = fmt.Errorf("failed to delete replace pair new pod %s/%s %s", podUpdateInfo.replacePairNewPodInfo.Namespace, podUpdateInfo.replacePairNewPodInfo.Name, err) return } - return } - if _, exist := podUpdateInfo.Pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]; !exist { + return +} + +func (u *replaceUpdatePodUpdater) UpgradePod(podInfo *PodUpdateInfo) error { + // add replace indicate label only and wait to replace when syncPods + if _, exist := podInfo.Pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]; !exist { // need replace pod, label pod with replace-indicate now := time.Now().UnixNano() patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%v", "%s": "%v"}}}`, appsv1alpha1.PodReplaceIndicationLabelKey, now, appsv1alpha1.PodReplaceByReplaceUpdateLabelKey, true))) - if err = u.Patch(u.ctx, podUpdateInfo.Pod, patch); err != nil { - err = fmt.Errorf("fail to label origin pod %s/%s with replace indicate label by replaceUpdate: %s", podUpdateInfo.Namespace, podUpdateInfo.Name, err) - return + if err := u.Patch(u.ctx, podInfo.Pod, patch); err != nil { + return fmt.Errorf("fail to label origin pod %s/%s with replace indicate label by replaceUpdate: %s", podInfo.Namespace, podInfo.Name, err) } + u.recorder.Eventf(podInfo.Pod, + corev1.EventTypeNormal, + "UpdatePod", + "succeed to update Pod %s/%s by label to-replace", + podInfo.Namespace, + podInfo.Name, + ) } - - return + return nil } func (u *replaceUpdatePodUpdater) GetPodUpdateFinishStatus(podUpdateInfo *PodUpdateInfo) (finished bool, msg string, err error) { @@ -530,6 +773,19 @@ func (u *replaceUpdatePodUpdater) GetPodUpdateFinishStatus(podUpdateInfo *PodUpd return isPodUpdatedServiceAvailable(replaceNewPodInfo) } +func (u *replaceUpdatePodUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error { + replacePairNewPodInfo := podInfo.replacePairNewPodInfo + if replacePairNewPodInfo != nil { + if _, exist := podInfo.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]; !exist { + patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano()))) + if err := u.podControl.PatchPod(podInfo.Pod, patch); err != nil { + return fmt.Errorf("failed to delete replace pair origin pod %s/%s %s", podInfo.Namespace, podInfo.replacePairNewPodInfo.Name, err) + } + } + } + return nil +} + func isPodUpdatedServiceAvailable(podInfo *PodUpdateInfo) (finished bool, msg string, err error) { if !podInfo.IsUpdatedRevision || podInfo.PodDecorationChanged { return false, "not updated revision", nil @@ -548,3 +804,68 @@ func isPodUpdatedServiceAvailable(podInfo *PodUpdateInfo) (finished bool, msg st return false, "pod not service available", nil } + +// prepare Pods to begin PodOpsLifecycle, filter updated revision pods or is already during pods +func filterPods(podToUpdate []*PodUpdateInfo, podCh chan *PodUpdateInfo) { + for i, podInfo := range podToUpdate { + if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged { + continue + } + + if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) { + continue + } + podCh <- podToUpdate[i] + } +} + +func filterPodsAndUpdatePodContext(cls *appsv1alpha1.CollaSet, podToUpdate []*PodUpdateInfo, recorder record.EventRecorder, + ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo, + client client.Client) (*time.Duration, error) { + var recordedRequeueAfter *time.Duration + // judge whether it need to update podContext + needUpdateContext := false + for i := range podToUpdate { + podInfo := podToUpdate[i] + requeueAfter, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, realValue(cls.Spec.UpdateStrategy.OperationDelaySeconds), podInfo.Pod) + if !allowed { + recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod %s is not allowed to update", commonutils.ObjectKeyString(podInfo.Pod)) + continue + } + if requeueAfter != nil { + recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "delay Pod update for %d seconds", requeueAfter.Seconds()) + if recordedRequeueAfter == nil || *requeueAfter < *recordedRequeueAfter { + recordedRequeueAfter = requeueAfter + } + continue + } + + if !ownedIDs[podInfo.ID].Contains(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) { + needUpdateContext = true + ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) + } + if podInfo.PodDecorationChanged { + decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations) + if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr { + needUpdateContext = true + ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr) + } + } + + if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged { + continue + } + // if Pod has not been updated, update it. + podCh <- podToUpdate[i] + } + + // 4. mark Pod to use updated revision before updating it. + if needUpdateContext { + recorder.Eventf(cls, corev1.EventTypeNormal, "UpdateToPodContext", "try to update ResourceContext for CollaSet") + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + return podcontext.UpdateToPodContext(client, cls, ownedIDs) + }) + return recordedRequeueAfter, err + } + return recordedRequeueAfter, nil +} From 0b9c39c1975469529d3e3899d8e195732081181d Mon Sep 17 00:00:00 2001 From: dbug-dk <402230675@qq.com> Date: Fri, 29 Mar 2024 09:55:51 +0800 Subject: [PATCH 2/7] remove some comments --- pkg/controllers/collaset/collaset_controller.go | 2 +- pkg/controllers/collaset/synccontrol/update.go | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/controllers/collaset/collaset_controller.go b/pkg/controllers/collaset/collaset_controller.go index 7cd9aeba..c00a3bf8 100644 --- a/pkg/controllers/collaset/collaset_controller.go +++ b/pkg/controllers/collaset/collaset_controller.go @@ -107,7 +107,7 @@ func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error { } // +kubebuilder:rbac:groups=apps.kusionstack.io,resources=collasets,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=apps.kusionstack.io,resources=collasets/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=apps.kusionstack.io,resources=collasets/status,verbs=get;update;patchu // +kubebuilder:rbac:groups=apps.kusionstack.io,resources=collasets/finalizers,verbs=update // +kubebuilder:rbac:groups=apps.kusionstack.io,resources=resourcecontexts,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps.kusionstack.io,resources=resourcecontexts/status,verbs=get;update;patch diff --git a/pkg/controllers/collaset/synccontrol/update.go b/pkg/controllers/collaset/synccontrol/update.go index 89ea1693..13667898 100644 --- a/pkg/controllers/collaset/synccontrol/update.go +++ b/pkg/controllers/collaset/synccontrol/update.go @@ -628,7 +628,6 @@ func (u *recreatePodUpdater) PrepareAndFilterPodUpdate(podToUpdate []*PodUpdateI return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err) } - //logger.V(1).Info("try to begin PodOpsLifecycle for updating Pod of CollaSet", "pod", commonutils.ObjectKeyString(podInfo.Pod)) if updated, err := podopslifecycle.Begin(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) { if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport { return podopslifecycle.WhenBeginDelete(obj) @@ -664,7 +663,7 @@ func (u *recreatePodUpdater) PrepareAndFilterPodUpdate(podToUpdate []*PodUpdateI collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetScale, nil, "UpdateFailed", "") } - return updating, nil, nil + return updating, recordedRequeueAfter, nil } func (u *recreatePodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) error { @@ -681,7 +680,6 @@ func (u *recreatePodUpdater) GetPodUpdateFinishStatus(podInfo *PodUpdateInfo) (f } func (u *recreatePodUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error { - //u.recorder.Eventf().V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod)) if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil { return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) } else if updated { From 2871bb55da16cf3b512a1a733118428c2d8d54a9 Mon Sep 17 00:00:00 2001 From: dbug-dk <402230675@qq.com> Date: Thu, 11 Apr 2024 19:51:53 +0800 Subject: [PATCH 3/7] refactor filter and lifecycle func by cr --- .../collaset/synccontrol/sync_control.go | 8 +- .../collaset/synccontrol/update.go | 238 ++++++------------ 2 files changed, 75 insertions(+), 171 deletions(-) diff --git a/pkg/controllers/collaset/synccontrol/sync_control.go b/pkg/controllers/collaset/synccontrol/sync_control.go index a146341a..6606eea0 100644 --- a/pkg/controllers/collaset/synccontrol/sync_control.go +++ b/pkg/controllers/collaset/synccontrol/sync_control.go @@ -701,12 +701,13 @@ func (r *RealSyncControl) Update( return false, nil, fmt.Errorf("fail to attach pod update info, %v", err) } - // 3. decide Pod update candidates + // 2. decide Pod update candidates podToUpdate := decidePodToUpdate(cls, podUpdateInfos) podCh := make(chan *PodUpdateInfo, len(podToUpdate)) updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder) updating := false + // 3. filter already updated revision, for i, podInfo := range podToUpdate { if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged { continue @@ -721,14 +722,15 @@ func (r *RealSyncControl) Update( if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) { continue } + podCh <- podToUpdate[i] } - updating, err = updater.BeginUpdate(resources, podCh) + updating, err = updater.BeginUpdatePod(resources, podCh) if err != nil { return updating, recordedRequeueAfter, err } - recordedRequeueAfter, err = updater.FilterAllowOpsPodsAndUpdatePodContext(podToUpdate, ownedIDs, resources, podCh) + recordedRequeueAfter, err = updater.FilterAllowOpsPods(podToUpdate, ownedIDs, resources, podCh) if err != nil { collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetScale, err, "UpdateFailed", diff --git a/pkg/controllers/collaset/synccontrol/update.go b/pkg/controllers/collaset/synccontrol/update.go index 24d8c404..5ceef05b 100644 --- a/pkg/controllers/collaset/synccontrol/update.go +++ b/pkg/controllers/collaset/synccontrol/update.go @@ -268,39 +268,15 @@ func (o orderByDefault) Less(i, j int) bool { } type PodUpdater interface { - BeginUpdate(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) - FilterAllowOpsPodsAndUpdatePodContext(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error) FulfillPodUpdatedInfo(revision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) error + BeginUpdatePod(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) + FilterAllowOpsPods(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error) UpgradePod(podInfo *PodUpdateInfo) error GetPodUpdateFinishStatus(podUpdateInfo *PodUpdateInfo) (bool, string, error) FinishUpdatePod(podInfo *PodUpdateInfo) error } -func newPodUpdater(ctx context.Context, client client.Client, cls *appsv1alpha1.CollaSet, podControl podcontrol.Interface, recorder record.EventRecorder) PodUpdater { - switch cls.Spec.UpdateStrategy.PodUpdatePolicy { - case appsv1alpha1.CollaSetRecreatePodUpdateStrategyType: - return &recreatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} - case appsv1alpha1.CollaSetInPlaceOnlyPodUpdateStrategyType: - // In case of using native K8s, Pod is only allowed to update with container image, so InPlaceOnly policy is - // implemented with InPlaceIfPossible policy as default for compatibility. - return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client} - case appsv1alpha1.CollaSetReplacePodUpdateStrategyType: - return &replaceUpdatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} - default: - return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} - } -} - -type PodStatus struct { - ContainerStates map[string]*ContainerStatus `json:"containerStates,omitempty"` -} - -type ContainerStatus struct { - LatestImage string `json:"latestImage,omitempty"` - LastImageID string `json:"lastImageID,omitempty"` -} - -type inPlaceIfPossibleUpdater struct { +type GenericPodUpdater struct { collaSet *appsv1alpha1.CollaSet ctx context.Context podControl podcontrol.Interface @@ -308,7 +284,7 @@ type inPlaceIfPossibleUpdater struct { client.Client } -func (u *inPlaceIfPossibleUpdater) BeginUpdate(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) { +func (u *GenericPodUpdater) BeginUpdatePod(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) { succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error { podInfo := <-podCh u.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "PodUpdateLifecycle", "try to begin PodOpsLifecycle for updating Pod of CollaSet") @@ -339,7 +315,7 @@ func (u *inPlaceIfPossibleUpdater) BeginUpdate(resources *collasetutils.RelatedR return updating, nil } -func (u *inPlaceIfPossibleUpdater) FilterAllowOpsPodsAndUpdatePodContext(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error) { +func (u *GenericPodUpdater) FilterAllowOpsPods(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error) { var recordedRequeueAfter *time.Duration needUpdateContext := false for i := range podToUpdate { @@ -386,6 +362,56 @@ func (u *inPlaceIfPossibleUpdater) FilterAllowOpsPodsAndUpdatePodContext(podToUp return recordedRequeueAfter, nil } +func (u *GenericPodUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error { + //u.recorder.Eventf().V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod)) + if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil { + return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) + } else if updated { + // add an expectation for this pod update, before next reconciling + if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { + return err + } + u.recorder.Eventf(podInfo.Pod, + corev1.EventTypeNormal, + "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name) + } + return nil +} + +func newPodUpdater(ctx context.Context, client client.Client, cls *appsv1alpha1.CollaSet, podControl podcontrol.Interface, recorder record.EventRecorder) PodUpdater { + genericPodUpdater := &GenericPodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} + switch cls.Spec.UpdateStrategy.PodUpdatePolicy { + case appsv1alpha1.CollaSetRecreatePodUpdateStrategyType: + return &recreatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder, GenericPodUpdater: *genericPodUpdater} + case appsv1alpha1.CollaSetInPlaceOnlyPodUpdateStrategyType: + // In case of using native K8s, Pod is only allowed to update with container image, so InPlaceOnly policy is + // implemented with InPlaceIfPossible policy as default for compatibility. + return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client} + case appsv1alpha1.CollaSetReplacePodUpdateStrategyType: + return &replaceUpdatePodUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder} + default: + return &inPlaceIfPossibleUpdater{collaSet: cls, ctx: ctx, Client: client, podControl: podControl, recorder: recorder, GenericPodUpdater: *genericPodUpdater} + } +} + +type PodStatus struct { + ContainerStates map[string]*ContainerStatus `json:"containerStates,omitempty"` +} + +type ContainerStatus struct { + LatestImage string `json:"latestImage,omitempty"` + LastImageID string `json:"lastImageID,omitempty"` +} + +type inPlaceIfPossibleUpdater struct { + collaSet *appsv1alpha1.CollaSet + ctx context.Context + podControl podcontrol.Interface + recorder record.EventRecorder + GenericPodUpdater + client.Client +} + func (u *inPlaceIfPossibleUpdater) FulfillPodUpdatedInfo( updatedRevision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) error { @@ -605,22 +631,6 @@ func (u *inPlaceIfPossibleUpdater) GetPodUpdateFinishStatus(podUpdateInfo *PodUp return true, "", nil } -func (u *inPlaceIfPossibleUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error { - //u.recorder.Eventf().V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod)) - if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil { - return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) - } else if updated { - // add an expectation for this pod update, before next reconciling - if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { - return err - } - u.recorder.Eventf(podInfo.Pod, - corev1.EventTypeNormal, - "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name) - } - return nil -} - // TODO type inPlaceOnlyPodUpdater struct { } @@ -650,87 +660,10 @@ type recreatePodUpdater struct { ctx context.Context podControl podcontrol.Interface recorder record.EventRecorder + GenericPodUpdater client.Client } -func (u *recreatePodUpdater) BeginUpdate(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) { - succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error { - podInfo := <-podCh - u.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "PodUpdateLifecycle", "try to begin PodOpsLifecycle for updating Pod of CollaSet") - if updated, err := podopslifecycle.Begin(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) { - if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport { - return podopslifecycle.WhenBeginDelete(obj) - } - return false, nil - }); err != nil { - return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) - } else if updated { - // add an expectation for this pod update, before next reconciling - if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { - return err - } - } - - return nil - }) - - updating := succCount > 0 - if err != nil { - collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error()) - return updating, err - } else { - collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "") - } - return updating, nil -} - -func (u *recreatePodUpdater) FilterAllowOpsPodsAndUpdatePodContext(podToUpdate []*PodUpdateInfo, ownedIDs map[int]*appsv1alpha1.ContextDetail, resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (*time.Duration, error) { - var recordedRequeueAfter *time.Duration - needUpdateContext := false - for i := range podToUpdate { - podInfo := podToUpdate[i] - requeueAfter, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, realValue(u.collaSet.Spec.UpdateStrategy.OperationDelaySeconds), podInfo.Pod) - if !allowed { - u.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod %s is not allowed to update", commonutils.ObjectKeyString(podInfo.Pod)) - continue - } - if requeueAfter != nil { - u.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "delay Pod update for %d seconds", requeueAfter.Seconds()) - if recordedRequeueAfter == nil || *requeueAfter < *recordedRequeueAfter { - recordedRequeueAfter = requeueAfter - } - continue - } - - if !ownedIDs[podInfo.ID].Contains(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) { - needUpdateContext = true - ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) - } - if podInfo.PodDecorationChanged { - decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations) - if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr { - needUpdateContext = true - ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr) - } - } - - if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged { - continue - } - // if Pod has not been updated, update it. - podCh <- podToUpdate[i] - } - // 4. mark Pod to use updated revision before updating it. - if needUpdateContext { - u.recorder.Eventf(u.collaSet, corev1.EventTypeNormal, "UpdateToPodContext", "try to update ResourceContext for CollaSet") - err := retry.RetryOnConflict(retry.DefaultRetry, func() error { - return podcontext.UpdateToPodContext(u.Client, u.collaSet, ownedIDs) - }) - return recordedRequeueAfter, err - } - return recordedRequeueAfter, nil -} - func (u *recreatePodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) error { return nil } @@ -744,21 +677,6 @@ func (u *recreatePodUpdater) GetPodUpdateFinishStatus(podInfo *PodUpdateInfo) (f return podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged, "", nil } -func (u *recreatePodUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error { - if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil { - return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) - } else if updated { - // add an expectation for this pod update, before next reconciling - if err := collasetutils.ActiveExpectations.ExpectUpdate(u.collaSet, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { - return err - } - u.recorder.Eventf(podInfo.Pod, - corev1.EventTypeNormal, - "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name) - } - return nil -} - type replaceUpdatePodUpdater struct { collaSet *appsv1alpha1.CollaSet ctx context.Context @@ -767,7 +685,7 @@ type replaceUpdatePodUpdater struct { client.Client } -func (u *replaceUpdatePodUpdater) BeginUpdate(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (updating bool, err error) { +func (u *replaceUpdatePodUpdater) BeginUpdatePod(resources *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (bool, error) { succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error { podInfo := <-podCh if podInfo.replacePairNewPodInfo != nil { @@ -785,10 +703,10 @@ func (u *replaceUpdatePodUpdater) BeginUpdate(resources *collasetutils.RelatedRe newPodRevision, resources.UpdatedRevision.Name) patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano()))) - if err = u.Patch(u.ctx, podInfo.replacePairNewPodInfo.Pod, patch); err != nil { - err = fmt.Errorf("failed to delete replace pair new pod %s/%s %s", - podInfo.replacePairNewPodInfo.Namespace, podInfo.replacePairNewPodInfo.Name, err) - return nil + if patchErr := u.Patch(u.ctx, podInfo.replacePairNewPodInfo.Pod, patch); patchErr != nil { + err := fmt.Errorf("failed to delete replace pair new pod %s/%s %s", + podInfo.replacePairNewPodInfo.Namespace, podInfo.replacePairNewPodInfo.Name, patchErr) + return err } } return nil @@ -797,34 +715,18 @@ func (u *replaceUpdatePodUpdater) BeginUpdate(resources *collasetutils.RelatedRe return succCount > 0, err } -func (u *replaceUpdatePodUpdater) FilterAllowOpsPodsAndUpdatePodContext(_ []*PodUpdateInfo, _ map[int]*appsv1alpha1.ContextDetail, _ *collasetutils.RelatedResources, _ chan *PodUpdateInfo) (requeueAfter *time.Duration, err error) { - return +func (u *replaceUpdatePodUpdater) FilterAllowOpsPods(podToUpdate []*PodUpdateInfo, _ map[int]*appsv1alpha1.ContextDetail, _ *collasetutils.RelatedResources, podCh chan *PodUpdateInfo) (requeueAfter *time.Duration, err error) { + for i, podInfo := range podToUpdate { + if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged { + continue + } + + podCh <- podToUpdate[i] + } + return nil, err } func (u *replaceUpdatePodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) (err error) { - //// when replaceUpdate, inPlaceUpdateSupport and onlyMetadataChanged always false - //// judge replace pair new pod is updated revision, if not, delete. - //if podUpdateInfo.replacePairNewPodInfo != nil { - // replacePairNewPod := podUpdateInfo.replacePairNewPodInfo.Pod - // newPodRevision, exist := replacePairNewPod.Labels[appsv1.ControllerRevisionHashLabelKey] - // if exist && newPodRevision == updatedRevision.Name { - // return - // } - // u.recorder.Eventf(podUpdateInfo.Pod, - // corev1.EventTypeNormal, - // "ReplaceUpdatePod", - // "label to-delete on new pair pod %s/%s because it is not updated revision, current revision: %s, updated revision: %s", - // replacePairNewPod.Namespace, - // replacePairNewPod.Name, - // newPodRevision, - // updatedRevision.Name) - // patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano()))) - // if err = u.Patch(u.ctx, podUpdateInfo.replacePairNewPodInfo.Pod, patch); err != nil { - // err = fmt.Errorf("failed to delete replace pair new pod %s/%s %s", - // podUpdateInfo.replacePairNewPodInfo.Namespace, podUpdateInfo.replacePairNewPodInfo.Name, err) - // return - // } - //} return } From 4ccdb0682a9d62992d94d829a579955108b8e65d Mon Sep 17 00:00:00 2001 From: dbug-dk <402230675@qq.com> Date: Thu, 11 Apr 2024 20:01:51 +0800 Subject: [PATCH 4/7] fix unittests error --- config/rbac/role.yaml | 2 +- pkg/controllers/collaset/synccontrol/sync_control.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 0b504561..e52ad07a 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -41,7 +41,7 @@ rules: - collasets/status verbs: - get - - patch + - patchu - update - apiGroups: - apps.kusionstack.io diff --git a/pkg/controllers/collaset/synccontrol/sync_control.go b/pkg/controllers/collaset/synccontrol/sync_control.go index 6606eea0..a115d53b 100644 --- a/pkg/controllers/collaset/synccontrol/sync_control.go +++ b/pkg/controllers/collaset/synccontrol/sync_control.go @@ -715,7 +715,7 @@ func (r *RealSyncControl) Update( // fulfillPodUpdateInfo to all not updatedRevision pod if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil { - logger.Error(err, fmt.Sprintf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name)) + logger.Error(err, fmt.Sprintf("fail to analyse pod %s/%s in-place update support", podInfo.Namespace, podInfo.Name)) continue } From e3230b50e87b669e84eaa33d495f80ccf21c50a3 Mon Sep 17 00:00:00 2001 From: dbug-dk <402230675@qq.com> Date: Thu, 11 Apr 2024 20:10:08 +0800 Subject: [PATCH 5/7] correct wrong rbac configuration for collaset_controller --- config/rbac/role.yaml | 2 +- pkg/controllers/collaset/collaset_controller.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index e52ad07a..0b504561 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -41,7 +41,7 @@ rules: - collasets/status verbs: - get - - patchu + - patch - update - apiGroups: - apps.kusionstack.io diff --git a/pkg/controllers/collaset/collaset_controller.go b/pkg/controllers/collaset/collaset_controller.go index fc2195b7..3a65ed98 100644 --- a/pkg/controllers/collaset/collaset_controller.go +++ b/pkg/controllers/collaset/collaset_controller.go @@ -108,7 +108,7 @@ func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error { } // +kubebuilder:rbac:groups=apps.kusionstack.io,resources=collasets,verbs=get;list;watch;create;update;patch;delete -// +kubebuilder:rbac:groups=apps.kusionstack.io,resources=collasets/status,verbs=get;update;patchu +// +kubebuilder:rbac:groups=apps.kusionstack.io,resources=collasets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=apps.kusionstack.io,resources=collasets/finalizers,verbs=update // +kubebuilder:rbac:groups=apps.kusionstack.io,resources=resourcecontexts,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps.kusionstack.io,resources=resourcecontexts/status,verbs=get;update;patch From fc14a9de60885762f024d5e5df37f0909f02e7c4 Mon Sep 17 00:00:00 2001 From: dbug-dk <402230675@qq.com> Date: Thu, 11 Apr 2024 20:24:22 +0800 Subject: [PATCH 6/7] format comments --- pkg/controllers/collaset/synccontrol/sync_control.go | 7 +++++-- pkg/controllers/collaset/synccontrol/update.go | 7 +++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/controllers/collaset/synccontrol/sync_control.go b/pkg/controllers/collaset/synccontrol/sync_control.go index a115d53b..7bce72fa 100644 --- a/pkg/controllers/collaset/synccontrol/sync_control.go +++ b/pkg/controllers/collaset/synccontrol/sync_control.go @@ -713,7 +713,7 @@ func (r *RealSyncControl) Update( continue } - // fulfillPodUpdateInfo to all not updatedRevision pod + // 3.1 fulfillPodUpdateInfo to all not updatedRevision pod if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil { logger.Error(err, fmt.Sprintf("fail to analyse pod %s/%s in-place update support", podInfo.Namespace, podInfo.Name)) continue @@ -726,10 +726,13 @@ func (r *RealSyncControl) Update( podCh <- podToUpdate[i] } + // 4. begin pod update lifecycle updating, err = updater.BeginUpdatePod(resources, podCh) if err != nil { return updating, recordedRequeueAfter, err } + + // 5. filter pods not allow to ops now, such as OperationDelaySeconds strategy recordedRequeueAfter, err = updater.FilterAllowOpsPods(podToUpdate, ownedIDs, resources, podCh) if err != nil { collasetutils.AddOrUpdateCondition(resources.NewStatus, @@ -767,7 +770,7 @@ func (r *RealSyncControl) Update( collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "") } - // try to finish all Pods'PodOpsLifecycle if its update is finished. + // 7. try to finish all Pods'PodOpsLifecycle if its update is finished. succCount, err = controllerutils.SlowStartBatch(len(podUpdateInfos), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error { podInfo := podUpdateInfos[i] diff --git a/pkg/controllers/collaset/synccontrol/update.go b/pkg/controllers/collaset/synccontrol/update.go index 5ceef05b..7d13e85f 100644 --- a/pkg/controllers/collaset/synccontrol/update.go +++ b/pkg/controllers/collaset/synccontrol/update.go @@ -351,7 +351,7 @@ func (u *GenericPodUpdater) FilterAllowOpsPods(podToUpdate []*PodUpdateInfo, own // if Pod has not been updated, update it. podCh <- podToUpdate[i] } - // 4. mark Pod to use updated revision before updating it. + // mark Pod to use updated revision before updating it. if needUpdateContext { u.recorder.Eventf(u.collaSet, corev1.EventTypeNormal, "UpdateToPodContext", "try to update ResourceContext for CollaSet") err := retry.RetryOnConflict(retry.DefaultRetry, func() error { @@ -363,7 +363,6 @@ func (u *GenericPodUpdater) FilterAllowOpsPods(podToUpdate []*PodUpdateInfo, own } func (u *GenericPodUpdater) FinishUpdatePod(podInfo *PodUpdateInfo) error { - //u.recorder.Eventf().V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod)) if updated, err := podopslifecycle.Finish(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil { return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) } else if updated { @@ -493,7 +492,7 @@ func (u *inPlaceIfPossibleUpdater) FulfillPodUpdatedInfo( func (u *inPlaceIfPossibleUpdater) UpgradePod(podInfo *PodUpdateInfo) error { if podInfo.OnlyMetadataChanged || podInfo.InPlaceUpdateSupport { - // 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly + // if pod template changes only include metadata or support in-place update, just apply these changes to pod directly if err := u.podControl.UpdatePod(podInfo.UpdatedPod); err != nil { return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err) } else { @@ -510,7 +509,7 @@ func (u *inPlaceIfPossibleUpdater) UpgradePod(podInfo *PodUpdateInfo) error { } } } else { - // 6.2 if pod has changes not in-place supported, recreate it + // if pod has changes not in-place supported, recreate it return recreatePod(u.collaSet, podInfo, u.podControl, u.recorder) } return nil From ba3463b79a2588111eb83ae253ccf87eaf03e1c0 Mon Sep 17 00:00:00 2001 From: dbug-dk <402230675@qq.com> Date: Wed, 17 Apr 2024 20:04:04 +0800 Subject: [PATCH 7/7] change diffPod method receiver to pointer, and return false in replaceUpdate GetPodUpdateFinishStatus when replace new pair pod is not exist --- pkg/controllers/collaset/synccontrol/update.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/collaset/synccontrol/update.go b/pkg/controllers/collaset/synccontrol/update.go index 7d13e85f..1d6c9bb2 100644 --- a/pkg/controllers/collaset/synccontrol/update.go +++ b/pkg/controllers/collaset/synccontrol/update.go @@ -534,7 +534,7 @@ func recreatePod(collaSet *appsv1alpha1.CollaSet, podInfo *PodUpdateInfo, podCon return nil } -func (u inPlaceIfPossibleUpdater) diffPod(currentPod, updatedPod *corev1.Pod) (inPlaceSetUpdateSupport bool, onlyMetadataChanged bool) { +func (u *inPlaceIfPossibleUpdater) diffPod(currentPod, updatedPod *corev1.Pod) (inPlaceSetUpdateSupport bool, onlyMetadataChanged bool) { if len(currentPod.Spec.Containers) != len(updatedPod.Spec.Containers) { return false, false } @@ -752,7 +752,7 @@ func (u *replaceUpdatePodUpdater) UpgradePod(podInfo *PodUpdateInfo) error { func (u *replaceUpdatePodUpdater) GetPodUpdateFinishStatus(podUpdateInfo *PodUpdateInfo) (finished bool, msg string, err error) { replaceNewPodInfo := podUpdateInfo.replacePairNewPodInfo if replaceNewPodInfo == nil { - return isPodUpdatedServiceAvailable(podUpdateInfo) + return } return isPodUpdatedServiceAvailable(replaceNewPodInfo)