Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support clean old lifecycle before new update #295

Merged
merged 6 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 109 additions & 17 deletions pkg/controllers/collaset/collaset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ var _ = Describe("collaset controller", func() {
}, 5*time.Second, 1*time.Second).Should(BeTrue())

Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for i := range podList.Items {
Expect(cleanPodLifecycleLabels(c, podList.Items[i].Namespace, podList.Items[i].Name)).Should(BeNil())
}
for _, number := range []int32{1, 2, 3, 4} {
pod := podList.Items[number-1]
// label pod to trigger update
Expand All @@ -625,10 +628,24 @@ var _ = Describe("collaset controller", func() {
return true
})).Should(BeNil())

Eventually(func() bool {
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, &pod)).Should(BeNil())
if !podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, &pod) {
return false
}
// allow Pod to do update
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())
return true
}, 10*time.Second, 1*time.Second).Should(BeTrue())

Eventually(func() error {
// check updated pod replicas by CollaSet status
return expectedStatusReplicas(c, cs, 0, 0, 0, 4, number, 1, 0, 0)
}, 5*time.Second, 1*time.Second).Should(BeNil())
}, 10*time.Second, 1*time.Second).Should(BeNil())

Expect(updatePodStatusWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
pod.Status.ContainerStatuses = []corev1.ContainerStatus{
Expand Down Expand Up @@ -957,6 +974,16 @@ var _ = Describe("collaset controller", func() {
return true
})).Should(BeNil())

Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for k := range podList.Items[0].Labels {
if strings.HasPrefix(k, appsv1alpha1.PodOperatingLabelPrefix) {
return true
}
}
return false
}, 5*time.Second, 1*time.Second).Should(BeTrue())

// allow Pod to update
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
Expand Down Expand Up @@ -1069,15 +1096,27 @@ var _ = Describe("collaset controller", func() {
return true
}))

for i := range podList.Items {
pod := &podList.Items[i]
// allow Pod to update
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = "true"
return true
})).Should(BeNil())
}
Eventually(func() int {
count := 0
for i := range podList.Items {
pod := &podList.Items[i]
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, pod)).Should(BeNil())
for k := range pod.Labels {
if strings.HasPrefix(k, appsv1alpha1.PodOperatingLabelPrefix) {
count++
// allow Pod to update
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = "true"
return true
})).Should(BeNil())
}
}

}
return count
}, 5*time.Second, 1*time.Second).Should(BeEquivalentTo(1))

Eventually(func() error {
// check updated pod replicas by CollaSet status
return expectedStatusReplicas(c, cs, 0, 0, 0, 2, 1, 1, 0, 0)
Expand Down Expand Up @@ -2082,8 +2121,7 @@ var _ = Describe("collaset controller", func() {

Eventually(func() bool {
pod := &corev1.Pod{}
error := c.Get(context.TODO(), types.NamespacedName{Namespace: originPod1.Namespace, Name: originPod1.Name}, pod)
Expect(error).Should(BeNil())
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: originPod1.Namespace, Name: originPod1.Name}, pod)).Should(BeNil())
Expect(pod.Labels).ShouldNot(BeNil())
_, replaceIndicate := pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]
_, replaceByUpdate := pod.Labels[appsv1alpha1.PodReplaceByReplaceUpdateLabelKey]
Expand All @@ -2101,14 +2139,21 @@ var _ = Describe("collaset controller", func() {
})).Should(BeNil())

// allow originPod2 to do inPlace update
Eventually(func() bool {
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: originPod2.Namespace, Name: originPod2.Name}, &originPod2)).Should(BeNil())
for k := range originPod2.Labels {
if strings.HasPrefix(k, appsv1alpha1.PodOperatingLabelPrefix) {
return true
}
}
return false
}, time.Second*10, time.Second).Should(BeTrue())
Expect(updatePodWithRetry(c, originPod2.Namespace, originPod2.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())

time.Sleep(3 * time.Second)

Eventually(func() error {
// check updated pod replicas by CollaSet status
return expectedStatusReplicas(c, cs, 0, 0, 0, 3, 2, 2, 0, 0)
Expand Down Expand Up @@ -3686,7 +3731,7 @@ var _ = Describe("collaset controller", func() {
return true
})).Should(BeNil())

// included pod should not have ownerReference
// included pod should have ownerReference
Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
podIncluded := false
Expand All @@ -3703,7 +3748,7 @@ var _ = Describe("collaset controller", func() {
return podIncluded
}, 10*time.Second, 1*time.Second).Should(BeTrue())

// included pvc should not have ownerReference
// included pvc should have ownerReference
Eventually(func() bool {
pvcIncluded := false
Expect(c.List(context.TODO(), pvcList, client.InNamespace(cs.Namespace))).Should(BeNil())
Expand Down Expand Up @@ -3738,6 +3783,16 @@ var _ = Describe("collaset controller", func() {
return true
})).Should(BeNil())

Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
for k := range podList.Items[0].Labels {
if strings.HasPrefix(k, appsv1alpha1.PodOperatingLabelPrefix) {
return true
}
}
return false
}, 5*time.Second, 1*time.Second).Should(BeTrue())

// allow pod to update
Expect(c.List(context.TODO(), pvcList, client.InNamespace(cs.Namespace))).Should(BeNil())
for i := range podList.Items {
Expand Down Expand Up @@ -3912,17 +3967,27 @@ var _ = Describe("collaset controller", func() {
},
}

Expect(c.Create(context.TODO(), podDecoration)).Should(BeNil())

// allow Pod to do update
for i := range podList.Items {
pod := podList.Items[i]
Eventually(func() bool {
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, &pod)).Should(BeNil())
for k := range pod.Labels {
if strings.HasPrefix(k, appsv1alpha1.PodOperatingLabelPrefix) {
return true
}
}
return false
}, time.Second*10, time.Second).Should(BeTrue())
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())
}

Expect(c.Create(context.TODO(), podDecoration)).Should(BeNil())
Eventually(func() int32 {
err := c.Get(context.TODO(), types.NamespacedName{Namespace: podDecoration.Namespace, Name: podDecoration.Name}, podDecoration)
if !errors.IsNotFound(err) {
Expand Down Expand Up @@ -4165,6 +4230,33 @@ func updatePodStatusWithRetry(c client.Client, namespace, name string, updateFn
})
}

func cleanPodLifecycleLabels(c client.Client, namespace, name string) error {
pod := &corev1.Pod{}
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
return err
}
newLabels := make(map[string]string)
for k, v := range pod.Labels {
has := false
for _, prefix := range appsv1alpha1.WellKnownLabelPrefixesWithID {
if strings.HasPrefix(k, prefix) {
has = true
break
}
}
if !has {
newLabels[k] = v
}
}
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
return err
}
pod.Labels = newLabels
return c.Update(context.TODO(), pod)
})
}

func testReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request) {
requests := make(chan reconcile.Request, 5)
fn := reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@ func (r *RealSyncControl) Update(
"UpdatePodCanceled",
"pod %s/%s with revision %s update is canceled due to not started and not included by partition",
podInfo.Namespace, podInfo.Name, podInfo.CurrentRevision.Name)
return ojutils.CancelOpsLifecycle(ctx, r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod)
return ojutils.CancelOpsLifecycle(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod)
}
// not allowedOps, skip GetPodUpdateFinishStatus
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/collaset/synccontrol/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ func (u *GenericPodUpdater) BeginUpdatePod(_ context.Context, resources *collase
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
podInfo := <-podCh
u.Recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "PodUpdateLifecycle", "try to begin PodOpsLifecycle for updating Pod of CollaSet")
if updated, err := podopslifecycle.Begin(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) {

if updated, err := podopslifecycle.BeginWithCleaningOld(u.Client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) {
if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport {
return podopslifecycle.WhenBeginDelete(obj)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/operationjob/operationjob_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@
lifecycleAdapter := NewLifecycleAdapter(operationJob.Name, operationJob.Spec.Action)

if forced {
err := ojutils.CancelOpsLifecycle(ctx, r.Client, lifecycleAdapter, candidate.Pod)
err := ojutils.CancelOpsLifecycle(r.Client, lifecycleAdapter, candidate.Pod)

Check warning on line 326 in pkg/controllers/operationjob/operationjob_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/operationjob/operationjob_manager.go#L326

Added line #L326 was not covered by tests
if err != nil {
return err
}
Expand Down
13 changes: 3 additions & 10 deletions pkg/controllers/operationjob/utils/lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ limitations under the License.
package utils

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
appsv1alpha1 "kusionstack.io/kube-api/apps/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/client"

podopslifecycleutil "kusionstack.io/kuperator/pkg/controllers/podopslifecycle"
Expand Down Expand Up @@ -66,22 +64,17 @@ func FinishOperateLifecycle(client client.Client, adapter podopslifecycle.Lifecy
return nil
}

func CancelOpsLifecycle(ctx context.Context, client client.Client, adapter podopslifecycle.LifecycleAdapter, pod *corev1.Pod) error {
func CancelOpsLifecycle(client client.Client, adapter podopslifecycle.LifecycleAdapter, pod *corev1.Pod) error {
if pod == nil {
return nil
}

// only cancel when lifecycle exist on pod
if exist, err := podopslifecycleutil.IsLifecycleOnPod(adapter.GetID(), pod); err != nil {
return err
return fmt.Errorf("fail to check %s PodOpsLifecycle on Pod %s/%s: %s", adapter.GetID(), pod.Namespace, pod.Name, err)
} else if !exist {
return nil
}

labelUndo := fmt.Sprintf("%s/%s", appsv1alpha1.PodUndoOperationTypeLabelPrefix, adapter.GetID())
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
pod.Labels[labelUndo] = string(adapter.GetType())
return client.Update(ctx, pod)
return podopslifecycle.Undo(client, adapter, pod)
}
1 change: 0 additions & 1 deletion pkg/controllers/podopslifecycle/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"

"kusionstack.io/kube-api/apps/v1alpha1"
)

Expand Down
26 changes: 26 additions & 0 deletions pkg/controllers/utils/podopslifecycle/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

"kusionstack.io/kube-api/apps/v1alpha1"

podopslifecycleutil "kusionstack.io/kuperator/pkg/controllers/podopslifecycle"
)

// IsDuringOps decides whether the Pod is during ops or not
Expand Down Expand Up @@ -85,6 +88,18 @@
return false, nil
}

// BeginWithCleaningOld is used for an CRD Operator to begin a lifecycle with cleaning the old lifecycle
func BeginWithCleaningOld(c client.Client, adapter LifecycleAdapter, obj client.Object, updateFunc ...UpdateFunc) (updated bool, err error) {
if podInUpdateLifecycle, err := podopslifecycleutil.IsLifecycleOnPod(adapter.GetID(), obj.(*corev1.Pod)); err != nil {
return false, fmt.Errorf("fail to check %s PodOpsLifecycle on Pod %s/%s: %s", adapter.GetID(), obj.GetNamespace(), obj.GetName(), err)
} else if podInUpdateLifecycle {
if err := Undo(c, adapter, obj); err != nil {
return false, err
}

Check warning on line 98 in pkg/controllers/utils/podopslifecycle/utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/utils/podopslifecycle/utils.go#L92-L98

Added lines #L92 - L98 were not covered by tests
}
return Begin(c, adapter, obj, updateFunc...)

Check warning on line 100 in pkg/controllers/utils/podopslifecycle/utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/utils/podopslifecycle/utils.go#L100

Added line #L100 was not covered by tests
}

// AllowOps is used to check whether the PodOpsLifecycle phase is in UPGRADE to do following operations.
func AllowOps(adapter LifecycleAdapter, operationDelaySeconds int32, obj client.Object) (requeueAfter *time.Duration, allow bool) {
if !IsDuringOps(adapter, obj) {
Expand Down Expand Up @@ -139,6 +154,12 @@
return false, err
}

// Undo is used for an CRD Operator to undo a lifecycle
func Undo(c client.Client, adapter LifecycleAdapter, obj client.Object) error {
setUndo(adapter, obj)
return c.Update(context.Background(), obj)

Check warning on line 160 in pkg/controllers/utils/podopslifecycle/utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/utils/podopslifecycle/utils.go#L158-L160

Added lines #L158 - L160 were not covered by tests
}

func checkOperatingID(adapter LifecycleAdapter, obj client.Object) (val string, ok bool) {
labelID := fmt.Sprintf("%s/%s", v1alpha1.PodOperatingLabelPrefix, adapter.GetID())
_, ok = obj.GetLabels()[labelID]
Expand Down Expand Up @@ -177,6 +198,11 @@
return
}

func setUndo(adapter LifecycleAdapter, obj client.Object) {
labelUndo := fmt.Sprintf("%s/%s", v1alpha1.PodUndoOperationTypeLabelPrefix, adapter.GetID())
obj.GetLabels()[labelUndo] = string(adapter.GetType())

Check warning on line 203 in pkg/controllers/utils/podopslifecycle/utils.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/utils/podopslifecycle/utils.go#L201-L203

Added lines #L201 - L203 were not covered by tests
}

func deleteOperatingID(adapter LifecycleAdapter, obj client.Object) (val string, ok bool) {
labelID := fmt.Sprintf("%s/%s", v1alpha1.PodOperatingLabelPrefix, adapter.GetID())
delete(obj.GetLabels(), labelID)
Expand Down
Loading
Loading