Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor podUpdater interface #166

Merged
merged 8 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions pkg/controllers/collaset/collaset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ var _ = Describe("collaset controller", func() {
}
})

It("replace update reconcile by partition", func() {
It("[replace update] reconcile by partition", func() {
testcase := "test-replace-update-by-partition"
Expect(createNamespace(c, testcase)).Should(BeNil())

Expand Down Expand Up @@ -753,7 +753,7 @@ var _ = Describe("collaset controller", func() {
Expect(inDeleteReplicas).Should(BeEquivalentTo(4))
})

It("replace update reconcile by label", func() {
It("[replace update] reconcile by label", func() {
testcase := "test-replace-update-by-label"
Expect(createNamespace(c, testcase)).Should(BeNil())

Expand Down Expand Up @@ -989,7 +989,7 @@ var _ = Describe("collaset controller", func() {

// double check updated pod replicas
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
var replacePairNewId, newPodInstanceId string
var replacePairNewId, newPodInstanceId, newCreatePodName string
for _, pod := range podList.Items {
Expect(pod.Labels).ShouldNot(BeNil())
// replace by current revision
Expand All @@ -999,13 +999,40 @@ var _ = Describe("collaset controller", func() {
replacePairNewId = pod.Labels[appsv1alpha1.PodReplacePairNewId]
Expect(replacePairNewId).ShouldNot(BeNil())
} else {
newCreatePodName = pod.Name
newPodInstanceId = pod.Labels[appsv1alpha1.PodInstanceIDLabelKey]
Expect(pod.Labels[appsv1alpha1.PodReplacePairOriginName]).Should(BeEquivalentTo(replacePod.Name))
}
}
Expect(replacePairNewId).ShouldNot(BeEquivalentTo(""))
Expect(newPodInstanceId).ShouldNot(BeEquivalentTo(""))
Expect(newPodInstanceId).Should(BeEquivalentTo(replacePairNewId))
Expect(newCreatePodName).ShouldNot(BeEquivalentTo(""))

Expect(updatePodWithRetry(c, replacePod.Namespace, newCreatePodName, func(pod *corev1.Pod) bool {
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true"
return true
})).Should(BeNil())

Eventually(func() error {
// check updated pod replicas by CollaSet status
return expectedStatusReplicas(c, cs, 0, 0, 1, 2, 0, 0, 0, 0)
}, 30*time.Second, 1*time.Second).Should(BeNil())

Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for _, pod := range podList.Items {
Expect(pod.Labels).ShouldNot(BeNil())
// replace by current revision
Expect(pod.Labels[appsv1.ControllerRevisionHashLabelKey]).Should(BeEquivalentTo(cs.Status.CurrentRevision))
Expect(pod.Spec.Containers[0].Image).Should(BeEquivalentTo("nginx:v1"))
if pod.Name == replacePod.Name {
_, exist := pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]
Expect(exist).Should(BeTrue())
}
}
})

It("replace update change to inplaceUpdate", func() {
Expand Down
217 changes: 42 additions & 175 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,14 @@ func dealReplacePods(pods []*corev1.Pod, instance *appsv1alpha1.CollaSet) (needR

// pod is replace new created pod, skip replace
if originPodName, exist := pod.Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
if _, exist := podNameMap[originPodName]; !exist {
// replace pair origin pod is not exist, clean label.
if originPod, exist := podNameMap[originPodName]; !exist {
needCleanLabels = append(needCleanLabels, appsv1alpha1.PodReplacePairOriginName)
} else if !replaceByUpdate {
// not replace update, delete origin pod when new created pod is service available
if _, serviceAvailable := pod.Labels[appsv1alpha1.PodServiceAvailableLabel]; serviceAvailable {
needDeletePods = append(needDeletePods, originPod)
}
}
}

Expand Down Expand Up @@ -690,143 +696,57 @@ func (r *RealSyncControl) Update(
logger := r.logger.WithValues("collaset", commonutils.ObjectKeyString(cls))
var recordedRequeueAfter *time.Duration
// 1. scan and analysis pods update info
podUpdateInfos, err := attachPodUpdateInfo(ctx, podWrappers, resources)
podUpdateInfos, err := attachPodUpdateInfo(ctx, cls, podWrappers, resources)
if err != nil {
return false, nil, fmt.Errorf("fail to attach pod update info, %v", err)
}
for _, podInfo := range podUpdateInfos {
// if template is updated, update pod by recreate
podInfo.PvcTmpHashChanged, err = pvccontrol.IsPodPvcTmpChanged(cls, podInfo.PodWrapper.Pod, resources.ExistingPvcs)
if err != nil {
return false, nil, fmt.Errorf("fail to check pvc template changed, %v", err)
}
}

// 2. decide Pod update candidates
podToUpdate := decidePodToUpdate(cls, podUpdateInfos)
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
updater := newPodUpdater(ctx, r.client, cls)
updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder)
updating := false
analysedPod := sets.NewString()

if cls.Spec.UpdateStrategy.PodUpdatePolicy != appsv1alpha1.CollaSetReplacePodUpdateStrategyType {
// 3. prepare Pods to begin PodOpsLifecycle
for i, podInfo := range podToUpdate {
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}

if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) {
continue
}
podCh <- podToUpdate[i]
// 3. filter already updated revision,
for i, podInfo := range podToUpdate {
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged {
continue
}

// 4. begin podOpsLifecycle parallel

succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
podInfo := <-podCh
// fulfill Pod update information
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}
analysedPod.Insert(podInfo.Name)
logger.V(1).Info("try to begin PodOpsLifecycle for updating Pod of CollaSet", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Begin(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) {
if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport {
return podopslifecycle.WhenBeginDelete(obj)
}
return false, nil
}); err != nil {
return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
}
return nil
})

updating = updating || succCount > 0
if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error())
return updating, nil, err
} else {
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "")
// 3.1 fulfillPodUpdateInfo to all not updatedRevision pod
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
logger.Error(err, fmt.Sprintf("fail to analyse pod %s/%s in-place update support", podInfo.Namespace, podInfo.Name))
continue
}

needUpdateContext := false
for i := range podToUpdate {
podInfo := podToUpdate[i]
requeueAfter, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, realValue(cls.Spec.UpdateStrategy.OperationDelaySeconds), podInfo.Pod)
if !allowed {
r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "Pod %s is not allowed to update", commonutils.ObjectKeyString(podInfo.Pod))
continue
}
if requeueAfter != nil {
r.recorder.Eventf(podInfo, corev1.EventTypeNormal, "PodUpdateLifecycle", "delay Pod update for %d seconds", requeueAfter.Seconds())
if recordedRequeueAfter == nil || *requeueAfter < *recordedRequeueAfter {
recordedRequeueAfter = requeueAfter
}
continue
}

if !ownedIDs[podInfo.ID].Contains(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name) {
needUpdateContext = true
ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name)
}
if podInfo.PodDecorationChanged {
decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations)
if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr {
needUpdateContext = true
ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr)
}
}

if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}
// if Pod has not been updated, update it.
podCh <- podToUpdate[i]
if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) {
continue
}

// 5. mark Pod to use updated revision before updating it.
if needUpdateContext {
logger.V(1).Info("try to update ResourceContext for CollaSet")
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return podcontext.UpdateToPodContext(r.client, cls, ownedIDs)
})
podCh <- podToUpdate[i]
}

if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, err, "UpdateFailed",
fmt.Sprintf("fail to update Context for updating: %s", err))
return updating, recordedRequeueAfter, err
} else {
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, nil, "UpdateFailed", "")
}
}
// 4. begin pod update lifecycle
updating, err = updater.BeginUpdatePod(resources, podCh)
if err != nil {
return updating, recordedRequeueAfter, err
}

// 5. filter pods not allow to ops now, such as OperationDelaySeconds strategy
recordedRequeueAfter, err = updater.FilterAllowOpsPods(podToUpdate, ownedIDs, resources, podCh)
if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, err, "UpdateFailed",
fmt.Sprintf("fail to update Context for updating: %s", err))
return updating, recordedRequeueAfter, err
} else {
for i, podInfo := range podToUpdate {
// The pod is in a "replaceUpdate" state, always requires further update processing.
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged {
continue
}
podCh <- podToUpdate[i]
}
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, nil, "Updated", "")
}

// 6. update Pod
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, _ error) error {
podInfo := <-podCh
if !analysedPod.Has(podInfo.Name) {
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}
}

logger.V(1).Info("before pod update operation",
"pod", commonutils.ObjectKeyString(podInfo.Pod),
"revision.from", podInfo.CurrentRevision.Name,
Expand All @@ -835,41 +755,8 @@ func (r *RealSyncControl) Update(
"onlyMetadataChanged", podInfo.OnlyMetadataChanged,
)

if (podInfo.OnlyMetadataChanged || podInfo.InPlaceUpdateSupport) && !podInfo.PvcTmpHashChanged {
// 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly
if err = r.podControl.UpdatePod(podInfo.UpdatedPod); err != nil {
return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err)
} else {
podInfo.Pod = podInfo.UpdatedPod
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePod",
"succeed to update Pod %s/%s to from revision %s to revision %s by in-place",
podInfo.Namespace, podInfo.Name,
podInfo.CurrentRevision.Name,
resources.UpdatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.UpdatedPod.ResourceVersion); err != nil {
return err
}
}
} else if cls.Spec.UpdateStrategy.PodUpdatePolicy == appsv1alpha1.CollaSetReplacePodUpdateStrategyType {
return nil
} else {
// 6.2 if pod has changes not in-place supported, recreate it
if err = r.podControl.DeletePod(podInfo.Pod); err != nil {
return fmt.Errorf("fail to delete Pod %s/%s when updating by recreate: %s", podInfo.Namespace, podInfo.Name, err)
} else {
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePod",
"succeed to update Pod %s/%s to from revision %s to revision %s by recreate",
podInfo.Namespace,
podInfo.Name,
podInfo.CurrentRevision.Name, resources.UpdatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectDelete(cls, expectations.Pod, podInfo.Name); err != nil {
return err
}
}
if err = updater.UpgradePod(podInfo); err != nil {
return err
}

return nil
Expand All @@ -883,7 +770,7 @@ func (r *RealSyncControl) Update(
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "")
}

// try to finish all Pods'PodOpsLifecycle if its update is finished.
// 7. try to finish all Pods'PodOpsLifecycle if its update is finished.
succCount, err = controllerutils.SlowStartBatch(len(podUpdateInfos), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
podInfo := podUpdateInfos[i]

Expand All @@ -898,29 +785,9 @@ func (r *RealSyncControl) Update(
}

if finished {
if podInfo.isInReplacing {
replacePairNewPodInfo := podInfo.replacePairNewPodInfo
if replacePairNewPodInfo != nil {
if _, exist := replacePairNewPodInfo.Labels[appsv1alpha1.PodDeletionIndicationLabelKey]; !exist {
patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%d"}}}`, appsv1alpha1.PodDeletionIndicationLabelKey, time.Now().UnixNano())))
if err = r.podControl.PatchPod(podInfo.Pod, patch); err != nil {
return fmt.Errorf("failed to delete replace pair origin pod %s/%s %s", podInfo.Namespace, podInfo.replacePairNewPodInfo.Name, err)
}
}
}
} else {
logger.V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Finish(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
}
err := updater.FinishUpdatePod(podInfo)
if err != nil {
return err
}
} else {
r.recorder.Eventf(podInfo.Pod,
Expand Down
Loading
Loading