Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor podUpdater interface #166

Merged
merged 8 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions pkg/controllers/collaset/collaset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ var _ = Describe("collaset controller", func() {
}
})

It("replace update reconcile by partition", func() {
It("[replace update] reconcile by partition", func() {
testcase := "test-replace-update-by-partition"
Expect(createNamespace(c, testcase)).Should(BeNil())

Expand Down Expand Up @@ -750,7 +750,7 @@ var _ = Describe("collaset controller", func() {
Expect(inDeleteReplicas).Should(BeEquivalentTo(4))
})

It("replace update reconcile by label", func() {
It("[replace update] reconcile by label", func() {
testcase := "test-replace-update-by-label"
Expect(createNamespace(c, testcase)).Should(BeNil())

Expand Down Expand Up @@ -986,7 +986,7 @@ var _ = Describe("collaset controller", func() {

// double check updated pod replicas
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
var replacePairNewId, newPodInstanceId string
var replacePairNewId, newPodInstanceId, newCreatePodName string
for _, pod := range podList.Items {
Expect(pod.Labels).ShouldNot(BeNil())
// replace by current revision
Expand All @@ -996,13 +996,40 @@ var _ = Describe("collaset controller", func() {
replacePairNewId = pod.Labels[appsv1alpha1.PodReplacePairNewId]
Expect(replacePairNewId).ShouldNot(BeNil())
} else {
newCreatePodName = pod.Name
newPodInstanceId = pod.Labels[appsv1alpha1.PodInstanceIDLabelKey]
Expect(pod.Labels[appsv1alpha1.PodReplacePairOriginName]).Should(BeEquivalentTo(replacePod.Name))
}
}
Expect(replacePairNewId).ShouldNot(BeEquivalentTo(""))
Expect(newPodInstanceId).ShouldNot(BeEquivalentTo(""))
Expect(newPodInstanceId).Should(BeEquivalentTo(replacePairNewId))
Expect(newCreatePodName).ShouldNot(BeEquivalentTo(""))

Expect(updatePodWithRetry(c, replacePod.Namespace, newCreatePodName, func(pod *corev1.Pod) bool {
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true"
return true
})).Should(BeNil())

Eventually(func() error {
// check updated pod replicas by CollaSet status
return expectedStatusReplicas(c, cs, 0, 0, 1, 2, 0, 0, 0, 0)
}, 30*time.Second, 1*time.Second).Should(BeNil())

Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for _, pod := range podList.Items {
Expect(pod.Labels).ShouldNot(BeNil())
// replace by current revision
Expect(pod.Labels[appsv1.ControllerRevisionHashLabelKey]).Should(BeEquivalentTo(cs.Status.CurrentRevision))
Expect(pod.Spec.Containers[0].Image).Should(BeEquivalentTo("nginx:v1"))
if pod.Name == replacePod.Name {
_, exist := pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]
Expect(exist).Should(BeTrue())
}
}
})

It("replace update change to inplaceUpdate", func() {
Expand Down
190 changes: 18 additions & 172 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,15 @@ func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needR
needCleanLabels = []string{appsv1alpha1.PodReplaceIndicationLabelKey, appsv1alpha1.PodReplaceByReplaceUpdateLabelKey}
}

// pod is replace new created pod, skip replace
if originPodName, exist := pod.Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
if _, exist := podNameMap[originPodName]; !exist {
// replace pair origin pod is not exist, clean label.
if originPod, exist := podNameMap[originPodName]; !exist {
needCleanLabels = append(needCleanLabels, appsv1alpha1.PodReplacePairOriginName)
} else if !replaceByUpdate {
// not replace update, delete origin pod when new created pod is service available
if _, serviceAvailable := pod.Labels[appsv1alpha1.PodServiceAvailableLabel]; serviceAvailable {
needDeletePods = append(needDeletePods, originPod)
}
}
}

Expand Down Expand Up @@ -663,124 +668,17 @@ func (r *RealSyncControl) Update(
// 2. decide Pod update candidates
podToUpdate := decidePodToUpdate(cls, podUpdateInfos)
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
updater := newPodUpdater(ctx, r.client, cls)
updating := false
analysedPod := sets.NewString()

if cls.Spec.UpdateStrategy.PodUpdatePolicy != appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType {
// 3. prepare Pods to begin PodOpsLifecycle
for i, podInfo := range podToUpdate {
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}

if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) {
continue
}
podCh <- podToUpdate[i]
}

// 4. begin podOpsLifecycle parallel

succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
podInfo := <-podCh
// fulfill Pod update information
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}
analysedPod.Insert(podInfo.Name)
logger.V(1).Info("try to begin PodOpsLifecycle for updating Pod of CollaSet", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Begin(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) {
if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport {
return podopslifecycle.WhenBeginDelete(obj)
}
return false, nil
}); err != nil {
return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
}

return nil
})

updating = updating || succCount > 0
if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error())
return updating, nil, err
} else {
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "")
}

needUpdateContext := false
for i := range podToUpdate {
podInfo := podToUpdate[i]
requeueAfter, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, realValue(cls.Spec.UpdateStrategy.OperationDelaySeconds), podInfo.Pod)
if !allowed {
r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod %s is not allowed to update", commonutils.ObjectKeyString(podInfo.Pod))
continue
}
if requeueAfter != nil {
r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "delay Pod update for %d seconds", requeueAfter.Seconds())
if recordedRequeueAfter == nil || *requeueAfter < *recordedRequeueAfter {
recordedRequeueAfter = requeueAfter
}
continue
}

if !ownedIDs[podInfo.ID].Contains(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) {
needUpdateContext = true
ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name)
}
if podInfo.PodDecorationChanged {
decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations)
if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr {
needUpdateContext = true
ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr)
}
}
updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder)

if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}
// if Pod has not been updated, update it.
podCh <- podToUpdate[i]
}

// 5. mark Pod to use updated revision before updating it.
if needUpdateContext {
logger.V(1).Info("try to update ResourceContext for CollaSet")
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return podcontext.UpdateToPodContext(r.client, cls, ownedIDs)
})

if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, err, "UpdateFailed",
fmt.Sprintf("fail to update Context for updating: %s", err))
return updating, recordedRequeueAfter, err
} else {
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, nil, "UpdateFailed", "")
}
}
} else {
for i, podInfo := range podToUpdate {
// The pod is in a "replaceUpdate" state, always requires further update processing.
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}
podCh <- podToUpdate[i]
}
updating, recordedRequeueAfter, err := updater.PrepareAndFilterPodUpdate(podToUpdate, resources, ownedIDs, podCh)
if err != nil {
return updating, recordedRequeueAfter, err
}

// 6. update Pod
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, _ error) error {
podInfo := <-podCh
if !analysedPod.Has(podInfo.Name) {
if !podInfo.analysed {
dbug-dk marked this conversation as resolved.
Show resolved Hide resolved
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}
Expand All @@ -793,41 +691,9 @@ func (r *RealSyncControl) Update(
"inPlaceUpdate", podInfo.InPlaceUpdateSupport,
"onlyMetadataChanged", podInfo.OnlyMetadataChanged,
)
if podInfo.OnlyMetadataChanged || podInfo.InPlaceUpdateSupport {
// 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly
if err = r.podControl.UpdatePod(podInfo.UpdatedPod); err != nil {
return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err)
} else {
podInfo.Pod = podInfo.UpdatedPod
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePod",
"succeed to update Pod %s/%s to from revision %s to revision %s by in-place",
podInfo.Namespace, podInfo.Name,
podInfo.CurrentRevision.Name,
resources.UpdatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.UpdatedPod.ResourceVersion); err != nil {
return err
}
}
} else if cls.Spec.UpdateStrategy.PodUpdatePolicy == appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType {
return nil
} else {
// 6.2 if pod has changes not in-place supported, recreate it
if err = r.podControl.DeletePod(podInfo.Pod); err != nil {
return fmt.Errorf("fail to delete Pod %s/%s when updating by recreate: %s", podInfo.Namespace, podInfo.Name, err)
} else {
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePod",
"succeed to update Pod %s/%s to from revision %s to revision %s by recreate",
podInfo.Namespace,
podInfo.Name,
podInfo.CurrentRevision.Name, resources.UpdatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectDelete(cls, expectations.Pod, podInfo.Name); err != nil {
return err
}
}

if err = updater.UpgradePod(podInfo); err != nil {
return err
}

return nil
Expand Down Expand Up @@ -856,29 +722,9 @@ func (r *RealSyncControl) Update(
}

if finished {
if podInfo.isInReplacing {
replacePairNewPodInfo := podInfo.replacePairNewPodInfo
if replacePairNewPodInfo != nil {
if _, exist := replacePairNewPodInfo.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]; !exist {
patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano())))
if err = r.podControl.PatchPod(podInfo.Pod, patch); err != nil {
return fmt.Errorf("failed to delete replace pair origin pod %s/%s %s", podInfo.Namespace, podInfo.replacePairNewPodInfo.Name, err)
}
}
}
} else {
logger.V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Finish(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
}
err := updater.FinishUpdatePod(podInfo)
if err != nil {
return err
}
} else {
r.recorder.Eventf(podInfo.Pod,
Expand Down
Loading
Loading