Skip to content

Commit

Permalink
1. refactor podUpdater interface.
Browse files Browse the repository at this point in the history
2. fix bug: replace pod by to-replace label, not check new pod status when pod upgrade policy is not replace update
  • Loading branch information
dbug-dk committed Mar 15, 2024
1 parent 8087ea2 commit a3f2782
Show file tree
Hide file tree
Showing 3 changed files with 389 additions and 195 deletions.
33 changes: 30 additions & 3 deletions pkg/controllers/collaset/collaset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ var _ = Describe("collaset controller", func() {
}
})

It("replace update reconcile by partition", func() {
It("[replace update] reconcile by partition", func() {
testcase := "test-replace-update-by-partition"
Expect(createNamespace(c, testcase)).Should(BeNil())

Expand Down Expand Up @@ -750,7 +750,7 @@ var _ = Describe("collaset controller", func() {
Expect(inDeleteReplicas).Should(BeEquivalentTo(4))
})

It("replace update reconcile by label", func() {
It("[replace update] reconcile by label", func() {
testcase := "test-replace-update-by-label"
Expect(createNamespace(c, testcase)).Should(BeNil())

Expand Down Expand Up @@ -986,7 +986,7 @@ var _ = Describe("collaset controller", func() {

// double check updated pod replicas
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
var replacePairNewId, newPodInstanceId string
var replacePairNewId, newPodInstanceId, newCreatePodName string
for _, pod := range podList.Items {
Expect(pod.Labels).ShouldNot(BeNil())
// replace by current revision
Expand All @@ -996,13 +996,40 @@ var _ = Describe("collaset controller", func() {
replacePairNewId = pod.Labels[appsv1alpha1.PodReplacePairNewId]
Expect(replacePairNewId).ShouldNot(BeNil())
} else {
newCreatePodName = pod.Name
newPodInstanceId = pod.Labels[appsv1alpha1.PodInstanceIDLabelKey]
Expect(pod.Labels[appsv1alpha1.PodReplacePairOriginName]).Should(BeEquivalentTo(replacePod.Name))
}
}
Expect(replacePairNewId).ShouldNot(BeEquivalentTo(""))
Expect(newPodInstanceId).ShouldNot(BeEquivalentTo(""))
Expect(newPodInstanceId).Should(BeEquivalentTo(replacePairNewId))
Expect(newCreatePodName).ShouldNot(BeEquivalentTo(""))

Expect(updatePodWithRetry(c, replacePod.Namespace, newCreatePodName, func(pod *corev1.Pod) bool {
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true"
return true
})).Should(BeNil())

Eventually(func() error {
// check updated pod replicas by CollaSet status
return expectedStatusReplicas(c, cs, 0, 0, 1, 2, 0, 0, 0, 0)
}, 30*time.Second, 1*time.Second).Should(BeNil())

Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for _, pod := range podList.Items {
Expect(pod.Labels).ShouldNot(BeNil())
// replace by current revision
Expect(pod.Labels[appsv1.ControllerRevisionHashLabelKey]).Should(BeEquivalentTo(cs.Status.CurrentRevision))
Expect(pod.Spec.Containers[0].Image).Should(BeEquivalentTo("nginx:v1"))
if pod.Name == replacePod.Name {
_, exist := pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]
Expect(exist).Should(BeTrue())
}
}
})

It("replace update change to inplaceUpdate", func() {
Expand Down
190 changes: 18 additions & 172 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,15 @@ func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needR
needCleanLabels = []string{appsv1alpha1.PodReplaceIndicationLabelKey, appsv1alpha1.PodReplaceByReplaceUpdateLabelKey}
}

// pod is replace new created pod, skip replace
if originPodName, exist := pod.Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
if _, exist := podNameMap[originPodName]; !exist {
// replace pair origin pod is not exist, clean label.
if originPod, exist := podNameMap[originPodName]; !exist {
needCleanLabels = append(needCleanLabels, appsv1alpha1.PodReplacePairOriginName)
} else if !replaceByUpdate {
// not replace update, delete origin pod when new created pod is service available
if _, serviceAvailable := pod.Labels[appsv1alpha1.PodServiceAvailableLabel]; serviceAvailable {
needDeletePods = append(needDeletePods, originPod)
}
}
}

Expand Down Expand Up @@ -663,124 +668,17 @@ func (r *RealSyncControl) Update(
// 2. decide Pod update candidates
podToUpdate := decidePodToUpdate(cls, podUpdateInfos)
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
updater := newPodUpdater(ctx, r.client, cls)
updating := false
analysedPod := sets.NewString()

if cls.Spec.UpdateStrategy.PodUpdatePolicy != appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType {
// 3. prepare Pods to begin PodOpsLifecycle
for i, podInfo := range podToUpdate {
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}

if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) {
continue
}
podCh <- podToUpdate[i]
}

// 4. begin podOpsLifecycle parallel

succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
podInfo := <-podCh
// fulfill Pod update information
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}
analysedPod.Insert(podInfo.Name)
logger.V(1).Info("try to begin PodOpsLifecycle for updating Pod of CollaSet", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Begin(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) {
if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport {
return podopslifecycle.WhenBeginDelete(obj)
}
return false, nil
}); err != nil {
return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
}

return nil
})

updating = updating || succCount > 0
if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error())
return updating, nil, err
} else {
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "")
}

needUpdateContext := false
for i := range podToUpdate {
podInfo := podToUpdate[i]
requeueAfter, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, realValue(cls.Spec.UpdateStrategy.OperationDelaySeconds), podInfo.Pod)
if !allowed {
r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod %s is not allowed to update", commonutils.ObjectKeyString(podInfo.Pod))
continue
}
if requeueAfter != nil {
r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "delay Pod update for %d seconds", requeueAfter.Seconds())
if recordedRequeueAfter == nil || *requeueAfter < *recordedRequeueAfter {
recordedRequeueAfter = requeueAfter
}
continue
}

if !ownedIDs[podInfo.ID].Contains(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) {
needUpdateContext = true
ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name)
}
if podInfo.PodDecorationChanged {
decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations)
if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr {
needUpdateContext = true
ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr)
}
}
updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder)

if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}
// if Pod has not been updated, update it.
podCh <- podToUpdate[i]
}

// 5. mark Pod to use updated revision before updating it.
if needUpdateContext {
logger.V(1).Info("try to update ResourceContext for CollaSet")
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return podcontext.UpdateToPodContext(r.client, cls, ownedIDs)
})

if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, err, "UpdateFailed",
fmt.Sprintf("fail to update Context for updating: %s", err))
return updating, recordedRequeueAfter, err
} else {
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, nil, "UpdateFailed", "")
}
}
} else {
for i, podInfo := range podToUpdate {
// The pod is in a "replaceUpdate" state, always requires further update processing.
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}
podCh <- podToUpdate[i]
}
updating, recordedRequeueAfter, err := updater.PrepareAndFilterPodUpdate(podToUpdate, resources, ownedIDs, podCh)
if err != nil {
return updating, recordedRequeueAfter, err
}

// 6. update Pod
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, _ error) error {
podInfo := <-podCh
if !analysedPod.Has(podInfo.Name) {
if !podInfo.analysed {
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}
Expand All @@ -793,41 +691,9 @@ func (r *RealSyncControl) Update(
"inPlaceUpdate", podInfo.InPlaceUpdateSupport,
"onlyMetadataChanged", podInfo.OnlyMetadataChanged,
)
if podInfo.OnlyMetadataChanged || podInfo.InPlaceUpdateSupport {
// 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly
if err = r.podControl.UpdatePod(podInfo.UpdatedPod); err != nil {
return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err)
} else {
podInfo.Pod = podInfo.UpdatedPod
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePod",
"succeed to update Pod %s/%s to from revision %s to revision %s by in-place",
podInfo.Namespace, podInfo.Name,
podInfo.CurrentRevision.Name,
resources.UpdatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.UpdatedPod.ResourceVersion); err != nil {
return err
}
}
} else if cls.Spec.UpdateStrategy.PodUpdatePolicy == appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType {
return nil
} else {
// 6.2 if pod has changes not in-place supported, recreate it
if err = r.podControl.DeletePod(podInfo.Pod); err != nil {
return fmt.Errorf("fail to delete Pod %s/%s when updating by recreate: %s", podInfo.Namespace, podInfo.Name, err)
} else {
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePod",
"succeed to update Pod %s/%s to from revision %s to revision %s by recreate",
podInfo.Namespace,
podInfo.Name,
podInfo.CurrentRevision.Name, resources.UpdatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectDelete(cls, expectations.Pod, podInfo.Name); err != nil {
return err
}
}

if err = updater.UpgradePod(podInfo); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -856,29 +722,9 @@ func (r *RealSyncControl) Update(
}

if finished {
if podInfo.isInReplacing {
replacePairNewPodInfo := podInfo.replacePairNewPodInfo
if replacePairNewPodInfo != nil {
if _, exist := replacePairNewPodInfo.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]; !exist {
patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano())))
if err = r.podControl.PatchPod(podInfo.Pod, patch); err != nil {
return fmt.Errorf("failed to delete replace pair origin pod %s/%s %s", podInfo.Namespace, podInfo.replacePairNewPodInfo.Name, err)
}
}
}
} else {
logger.V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Finish(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
}
err := updater.FinishUpdatePod(podInfo)
if err != nil {
return err
}
} else {
r.recorder.Eventf(podInfo.Pod,
Expand Down
Loading

0 comments on commit a3f2782

Please sign in to comment.