Skip to content

Commit

Permalink
ptr: update pod detail after ptr status sync (#306)
Browse files Browse the repository at this point in the history
pdr: update pod detail after ptr status sync
  • Loading branch information
Eikykun authored Nov 22, 2024
1 parent 1f4d86d commit 281e071
Showing 1 changed file with 40 additions and 29 deletions.
69 changes: 40 additions & 29 deletions pkg/controllers/podtransitionrule/podtransitionrule_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "kusionstack.io/kube-api/apps/v1alpha1"

"kusionstack.io/kuperator/pkg/controllers/podtransitionrule/processor"
"kusionstack.io/kuperator/pkg/controllers/podtransitionrule/register"
podtransitionruleutils "kusionstack.io/kuperator/pkg/controllers/podtransitionrule/utils"
Expand Down Expand Up @@ -104,37 +105,37 @@ type PodTransitionRuleReconciler struct {
func (r *PodTransitionRuleReconciler) Reconcile(ctx context.Context, request reconcile.Request) (result reconcile.Result, reconcileErr error) {
logger := r.Logger.WithValues("podTransitionRule", request.String())
result = reconcile.Result{}
podTransitionRule := &appsv1alpha1.PodTransitionRule{}
if err := r.Client.Get(context.TODO(), request.NamespacedName, podTransitionRule); err != nil {
instance := &appsv1alpha1.PodTransitionRule{}
if err := r.Client.Get(context.TODO(), request.NamespacedName, instance); err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

if !podtransitionruleutils.PodTransitionRuleVersionExpectation.SatisfiedExpectations(commonutils.ObjectKeyString(podTransitionRule), podTransitionRule.ResourceVersion) {
logger.Info("podTransitionRule's resourceVersion is too old, retry later", "resourceVersion.now", podTransitionRule.ResourceVersion)
if !podtransitionruleutils.PodTransitionRuleVersionExpectation.SatisfiedExpectations(commonutils.ObjectKeyString(instance), instance.ResourceVersion) {
logger.Info("podTransitionRule's resourceVersion is too old, retry later", "resourceVersion.now", instance.ResourceVersion)
return reconcile.Result{}, nil
}

selector, _ := metav1.LabelSelectorAsSelector(podTransitionRule.Spec.Selector)
selector, _ := metav1.LabelSelectorAsSelector(instance.Spec.Selector)
selectedPods := &corev1.PodList{}
if err := r.Client.List(context.TODO(), selectedPods, &client.ListOptions{Namespace: podTransitionRule.Namespace, LabelSelector: selector}); err != nil {
if err := r.Client.List(context.TODO(), selectedPods, &client.ListOptions{Namespace: instance.Namespace, LabelSelector: selector}); err != nil {
logger.Error(err, "failed to list pod by podtransitionrule")
return reconcile.Result{}, err
}

// Delete
if podTransitionRule.DeletionTimestamp != nil {
if err := r.cleanUpPodTransitionRulePods(ctx, podTransitionRule); err != nil {
if instance.DeletionTimestamp != nil {
if err := r.cleanUpPodTransitionRulePods(ctx, instance); err != nil {
return reconcile.Result{}, err
}
if !controllerutil.ContainsFinalizer(podTransitionRule, appsv1alpha1.ProtectFinalizer) {
if !controllerutil.ContainsFinalizer(instance, appsv1alpha1.ProtectFinalizer) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, controllerutils.RemoveFinalizer(ctx, r.Client, podTransitionRule, appsv1alpha1.ProtectFinalizer)
} else if !controllerutil.ContainsFinalizer(podTransitionRule, appsv1alpha1.ProtectFinalizer) {
if err := controllerutils.AddFinalizer(ctx, r.Client, podTransitionRule, appsv1alpha1.ProtectFinalizer); err != nil {
return reconcile.Result{}, controllerutils.RemoveFinalizer(ctx, r.Client, instance, appsv1alpha1.ProtectFinalizer)
} else if !controllerutil.ContainsFinalizer(instance, appsv1alpha1.ProtectFinalizer) {
if err := controllerutils.AddFinalizer(ctx, r.Client, instance, appsv1alpha1.ProtectFinalizer); err != nil {
return result, fmt.Errorf("fail to add finalizer on PodTransitionRule %s: %s", request, err)
}
}
Expand All @@ -152,20 +153,30 @@ func (r *PodTransitionRuleReconciler) Reconcile(ctx context.Context, request rec
targetPods[pod.Name] = &selectedPods.Items[i]
}

pods := make([]*corev1.Pod, 0, len(targetPods))
for _, pod := range targetPods {
pods = append(pods, pod)
}

// sync pod detail by latest instance status
if err := r.syncPodsDetail(ctx, instance, pods); err != nil {
return reconcile.Result{}, fmt.Errorf("failed to sync pods details: %w", err)
}

// remove unselected pods
for _, name := range podTransitionRule.Status.Targets {
for _, name := range instance.Status.Targets {
if selectedPodNames.Has(name) {
continue
}

if _, err := r.updatePodTransitionRuleOnPod(ctx, podTransitionRule.Name, name, podTransitionRule.Namespace, podtransitionruleutils.MoveAllPodTransitionRuleInfo); err != nil {
if _, err := r.updatePodTransitionRuleOnPod(ctx, instance.Name, name, instance.Namespace, podtransitionruleutils.MoveAllPodTransitionRuleInfo); err != nil {
logger.Error(err, "failed to remote podtransitionrule on pod", "pod", name)
return result, err
}
}

// process rules
shouldRetry, interval, details, ruleStates := r.process(podTransitionRule, targetPods)
shouldRetry, interval, details, ruleStates := r.process(instance, targetPods)

res := reconcile.Result{
Requeue: shouldRetry,
Expand All @@ -190,31 +201,31 @@ func (r *PodTransitionRuleReconciler) Reconcile(ctx context.Context, request rec
tm := metav1.NewTime(time.Now())
newStatus := &appsv1alpha1.PodTransitionRuleStatus{
Targets: selectedPodNames.List(),
ObservedGeneration: podTransitionRule.Generation,
ObservedGeneration: instance.Generation,
Details: detailList,
RuleStates: ruleStates,
UpdateTime: &tm,
}

if !equalStatus(newStatus, &podTransitionRule.Status) {
podtransitionruleutils.PodTransitionRuleVersionExpectation.ExpectUpdate(commonutils.ObjectKeyString(podTransitionRule), podTransitionRule.ResourceVersion)
podTransitionRule.Status = *newStatus
if err := r.Client.Status().Update(ctx, podTransitionRule); err != nil {
podtransitionruleutils.PodTransitionRuleVersionExpectation.DeleteExpectations(commonutils.ObjectKeyString(podTransitionRule))
if !equalStatus(newStatus, &instance.Status) {
podtransitionruleutils.PodTransitionRuleVersionExpectation.ExpectUpdate(commonutils.ObjectKeyString(instance), instance.ResourceVersion)
instance.Status = *newStatus
if err := r.Client.Status().Update(ctx, instance); err != nil {
podtransitionruleutils.PodTransitionRuleVersionExpectation.DeleteExpectations(commonutils.ObjectKeyString(instance))
logger.Error(err, "failed to update podtransitionrule status")
return reconcile.Result{}, err
}
}
pods := make([]*corev1.Pod, 0, len(targetPods))
for _, pod := range targetPods {
pods = append(pods, pod)
}
return res, r.syncPodsDetail(ctx, podTransitionRule.Name, pods, details)
return res, nil
}

func (r *PodTransitionRuleReconciler) syncPodsDetail(ctx context.Context, podTransitionRuleName string, pods []*corev1.Pod, details map[string]*appsv1alpha1.PodTransitionDetail) error {
func (r *PodTransitionRuleReconciler) syncPodsDetail(ctx context.Context, instance *appsv1alpha1.PodTransitionRule, pods []*corev1.Pod) error {
details := map[string]*appsv1alpha1.PodTransitionDetail{}
for i, detail := range instance.Status.Details {
details[detail.Name] = instance.Status.Details[i]
}
_, err := controllerutils.SlowStartBatch(len(pods), 1, false, func(i int, _ error) error {
return r.updatePodDetail(ctx, pods[i], podTransitionRuleName, details[pods[i].Name])
return r.updatePodDetail(ctx, pods[i], instance.Name, details[pods[i].Name])
})
return err
}
Expand All @@ -223,7 +234,7 @@ func (r *PodTransitionRuleReconciler) updatePodDetail(ctx context.Context, pod *
detailAnno := appsv1alpha1.AnnotationPodTransitionRuleDetailPrefix + "/" + podTransitionRuleName
var newDetail string
if detail != nil {
newDetail = utils.DumpJSON(&appsv1alpha1.PodTransitionDetail{Stage: detail.Stage, Passed: detail.Passed})
newDetail = utils.DumpJSON(&appsv1alpha1.PodTransitionDetail{Stage: detail.Stage, Passed: detail.Passed, RejectInfo: detail.RejectInfo})
} else {
newDetail = utils.DumpJSON(&appsv1alpha1.PodTransitionDetail{Stage: "Unknown", Passed: true})
}
Expand Down

0 comments on commit 281e071

Please sign in to comment.