diff --git a/charts/templates/crd/apps.kusionstack.io_operationjobs.yaml b/charts/templates/crd/apps.kusionstack.io_operationjobs.yaml index 2750ef50..f12c5ef7 100644 --- a/charts/templates/crd/apps.kusionstack.io_operationjobs.yaml +++ b/charts/templates/crd/apps.kusionstack.io_operationjobs.yaml @@ -59,7 +59,7 @@ spec: activeDeadlineSeconds: description: |- Specify the duration in seconds relative to the startTime - that the job may be active before the system tries to terminate it + that the target may be active before the system tries to terminate it format: int32 type: integer operationDelaySeconds: @@ -92,7 +92,7 @@ spec: status: description: OperationJobStatus defines the observed state of OperationJob properties: - endTimestamp: + endTime: description: Operation end time format: date-time type: string @@ -109,7 +109,7 @@ spec: progress: description: Phase indicates the of the OperationJob type: string - startTimestamp: + startTime: description: Operation start time format: date-time type: string @@ -121,6 +121,10 @@ spec: description: Operation details of the target pods items: properties: + endTime: + description: target operation end time + format: date-time + type: string error: description: error indicates the error info of progressing properties: @@ -143,6 +147,10 @@ spec: progress: description: operation progress of target pod type: string + startTime: + description: target operation start time + format: date-time + type: string type: object type: array totalPodCount: diff --git a/config/crd/bases/apps.kusionstack.io_operationjobs.yaml b/config/crd/bases/apps.kusionstack.io_operationjobs.yaml index 2750ef50..f12c5ef7 100644 --- a/config/crd/bases/apps.kusionstack.io_operationjobs.yaml +++ b/config/crd/bases/apps.kusionstack.io_operationjobs.yaml @@ -59,7 +59,7 @@ spec: activeDeadlineSeconds: description: |- Specify the duration in seconds relative to the startTime - that the job may be active before the system tries to terminate it + that the target may be active before the system tries to terminate it format: int32 type: integer operationDelaySeconds: @@ -92,7 +92,7 @@ spec: status: description: OperationJobStatus defines the observed state of OperationJob properties: - endTimestamp: + endTime: description: Operation end time format: date-time type: string @@ -109,7 +109,7 @@ spec: progress: description: Phase indicates the of the OperationJob type: string - startTimestamp: + startTime: description: Operation start time format: date-time type: string @@ -121,6 +121,10 @@ spec: description: Operation details of the target pods items: properties: + endTime: + description: target operation end time + format: date-time + type: string error: description: error indicates the error info of progressing properties: @@ -143,6 +147,10 @@ spec: progress: description: operation progress of target pod type: string + startTime: + description: target operation start time + format: date-time + type: string type: object type: array totalPodCount: diff --git a/go.mod b/go.mod index 3f7a7b9d..5f05d38b 100644 --- a/go.mod +++ b/go.mod @@ -94,7 +94,7 @@ require ( k8s.io/apiserver v0.22.6 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/kubectl v0.29.0 - kusionstack.io/kube-api v0.6.0 + kusionstack.io/kube-api v0.6.1-0.20241010064700-c805b4e9064d sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index d608ab8e..5ed5f76f 100644 --- a/go.sum +++ b/go.sum @@ -1139,8 +1139,8 @@ k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -kusionstack.io/kube-api v0.6.0 h1:FSvvZvhpAul4mnCQXYI61dIQ8QT5txdPWNwWXdjJHME= -kusionstack.io/kube-api v0.6.0/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y= +kusionstack.io/kube-api v0.6.1-0.20241010064700-c805b4e9064d h1:vs0NGK0ZxHBkWn+47LxvvMMPkuv/wjNRq9gJRy8dOTI= +kusionstack.io/kube-api v0.6.1-0.20241010064700-c805b4e9064d/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y= kusionstack.io/resourceconsist v0.0.1 h1:+k/jriq5Ld7fQUYfWSMGynz/FesHtl3Rk2fmQPjBe0g= kusionstack.io/resourceconsist v0.0.1/go.mod h1:816xS/fY6EOUbPFjXIWW/TGs8/YE46qP4ElKeIiwFdU= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= diff --git a/pkg/controllers/operationjob/operationjob_controller.go b/pkg/controllers/operationjob/operationjob_controller.go index 4ab94254..4a3bbf60 100644 --- a/pkg/controllers/operationjob/operationjob_controller.go +++ b/pkg/controllers/operationjob/operationjob_controller.go @@ -117,8 +117,13 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req return reconcile.Result{Requeue: true}, nil } + candidates, err := r.listTargets(ctx, instance) + if err != nil { + return reconcile.Result{}, err + } + if instance.DeletionTimestamp != nil { - if err := r.releaseTargets(ctx, instance); err != nil { + if err := r.releaseTargets(ctx, instance, candidates, true); err != nil { r.Recorder.Eventf(instance, corev1.EventTypeWarning, "ReleaseTargetFailed", fmt.Sprintf("failed to release targets when job deleting: %s", err.Error())) return reconcile.Result{}, err } @@ -128,27 +133,17 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req return reconcile.Result{}, err } - jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, logger) + jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, candidates, logger) if jobDeleted || err != nil { return reconcile.Result{}, err } - reconcileErr := r.doReconcile(ctx, instance) - updateErr := r.updateStatus(ctx, instance) - return requeueResult(requeueAfter), ctrlutils.AggregateErrors([]error{reconcileErr, updateErr}) + err = r.doReconcile(ctx, instance, candidates) + return requeueResult(requeueAfter), err } -func (r *ReconcileOperationJob) getActionHandlerAndTargets(ctx context.Context, instance *appsv1alpha1.OperationJob) ( - actionHandler ActionHandler, enablePodOpsLifecycle bool, candidates []*OpsCandidate, err error) { - if actionHandler, enablePodOpsLifecycle, err = r.getActionHandler(instance); err != nil { - return - } - candidates, err = r.listTargets(ctx, instance) - return -} - -func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob) error { - actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, instance) +func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) error { + actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(instance) if err != nil { return err } @@ -160,19 +155,24 @@ func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv getErr := r.getTargetsOpsStatus(ctx, actionHandler, selectedCandidates, enablePodOpsLifecycle, instance) // calculate opsStatus of all candidates instance.Status = r.calculateStatus(instance, candidates) - return controllerutils.AggregateErrors([]error{opsErr, getErr}) + // update operationjob status + updateErr := r.updateStatus(ctx, instance) + return controllerutils.AggregateErrors([]error{opsErr, getErr, updateErr}) } func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) (jobStatus appsv1alpha1.OperationJobStatus) { now := ctrlutils.FormatTimeNow() jobStatus = appsv1alpha1.OperationJobStatus{ - StartTimestamp: instance.Status.StartTimestamp, - EndTimestamp: instance.Status.EndTimestamp, + StartTime: instance.Status.StartTime, + EndTime: instance.Status.EndTime, Progress: instance.Status.Progress, ObservedGeneration: instance.Generation, } for _, candidate := range candidates { + if candidate.OpsStatus.EndTime == nil && IsCandidateOpsFinished(candidate) { + candidate.OpsStatus.EndTime = &now + } jobStatus.TargetDetails = append(jobStatus.TargetDetails, *candidate.OpsStatus) } @@ -197,6 +197,10 @@ func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.Operation if !ojutils.IsJobFinished(&appsv1alpha1.OperationJob{Status: jobStatus}) { jobStatus.Progress = appsv1alpha1.OperationProgressProcessing + if jobStatus.StartTime == nil { + jobStatus.StartTime = &now + } + if pendingPodCount == totalPodCount { jobStatus.Progress = appsv1alpha1.OperationProgressPending } @@ -208,8 +212,8 @@ func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.Operation jobStatus.Progress = appsv1alpha1.OperationProgressSucceeded } - if jobStatus.EndTimestamp == nil { - jobStatus.EndTimestamp = &now + if jobStatus.EndTime == nil { + jobStatus.EndTime = &now } } } diff --git a/pkg/controllers/operationjob/operationjob_controller_test.go b/pkg/controllers/operationjob/operationjob_controller_test.go index a8f5a1d7..0073ac61 100644 --- a/pkg/controllers/operationjob/operationjob_controller_test.go +++ b/pkg/controllers/operationjob/operationjob_controller_test.go @@ -498,7 +498,7 @@ var _ = Describe("operationjob controller", func() { It("deadline and ttl", func() { testcase := "test-deadline-ttl" Expect(createNamespace(c, testcase)).Should(BeNil()) - cs := createCollaSetWithReplicas("foo", testcase, 2) + cs := createCollaSetWithReplicas("foo", testcase, 3) podNames := getPodNamesFromCollaSet(cs) oj := &appsv1alpha1.OperationJob{ @@ -507,7 +507,8 @@ var _ = Describe("operationjob controller", func() { Name: "foo", }, Spec: appsv1alpha1.OperationJobSpec{ - Action: appsv1alpha1.OpsActionReplace, + Partition: int32Pointer(0), + Action: appsv1alpha1.OpsActionReplace, Targets: []appsv1alpha1.PodOpsTarget{ { Name: podNames[0], @@ -515,50 +516,29 @@ var _ = Describe("operationjob controller", func() { { Name: podNames[1], }, + { + Name: podNames[2], + }, }, - ActiveDeadlineSeconds: int32Pointer(10), + ActiveDeadlineSeconds: int32Pointer(3), TTLSecondsAfterFinished: int32Pointer(5), }, } Expect(c.Create(ctx, oj)).Should(BeNil()) - // wait for new pod created - podList := &corev1.PodList{} - Eventually(func() bool { - Expect(c.List(ctx, podList, client.InNamespace(cs.Namespace))).Should(BeNil()) - return len(podList.Items) == 4 - }, time.Second*10, time.Second).Should(BeTrue()) - - // mock only 1 new pod serviceAvailable - Expect(c.List(ctx, podList, client.InNamespace(cs.Namespace))).Should(BeNil()) - for i := range podList.Items { - if _, exist := podList.Items[i].Labels[appsv1alpha1.PodReplacePairOriginName]; exist { - Expect(updatePodWithRetry(podList.Items[i].Namespace, podList.Items[i].Name, func(pod *corev1.Pod) bool { - pod.Labels[appsv1alpha1.PodServiceAvailableLabel] = "true" - return true - })).Should(BeNil()) - break - } - } - - // allow origin pod to be deleted - for i := range podList.Items { - pod := &podList.Items[i] - if _, exist := pod.Labels[appsv1alpha1.PodReplaceIndicationLabelKey]; exist { - Expect(updatePodWithRetry(pod.Namespace, pod.Name, func(pod *corev1.Pod) bool { - labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, poddeletion.OpsLifecycleAdapter.GetID()) - pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano()) - pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey] = fmt.Sprintf("%d", time.Now().UnixNano()) - return true - })).Should(BeNil()) - } + for _, partition := range []int32{0, 1, 2, 3} { + // update partition + Eventually(func() error { + return c.Get(context.TODO(), types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj) + }, time.Second*5, time.Second).Should(BeNil()) + Expect(updateOperationJobWithRetry(oj.Namespace, oj.Name, func(job *appsv1alpha1.OperationJob) bool { + job.Spec.Partition = &partition + return true + })).Should(BeNil()) + // wait for replace failed after ActiveDeadlineSeconds + assertFailedReplicas(oj, partition, time.Second*1000) } - assertSucceededReplicas(oj, 1, time.Second*10) - - // wait for replace failed after ActiveDeadlineSeconds - assertJobProgressFailed(oj, time.Second*10) - assertFailedReplicas(oj, 1, time.Second*10) // wait for operationJob deleted after TTL Eventually(func() bool { diff --git a/pkg/controllers/operationjob/operationjob_manager.go b/pkg/controllers/operationjob/operationjob_manager.go index c77a01f5..57b05ba2 100644 --- a/pkg/controllers/operationjob/operationjob_manager.go +++ b/pkg/controllers/operationjob/operationjob_manager.go @@ -32,6 +32,7 @@ import ( "kusionstack.io/kuperator/pkg/controllers/operationjob/replace" ojutils "kusionstack.io/kuperator/pkg/controllers/operationjob/utils" controllerutils "kusionstack.io/kuperator/pkg/controllers/utils" + ctrlutils "kusionstack.io/kuperator/pkg/controllers/utils" "kusionstack.io/kuperator/pkg/controllers/utils/podopslifecycle" ) @@ -149,6 +150,10 @@ func (r *ReconcileOperationJob) filterAndOperateAllowOpsTargets( } else { candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressProcessing } + if candidate.OpsStatus.StartTime == nil { + now := ctrlutils.FormatTimeNow() + candidate.OpsStatus.StartTime = &now + } } if isAllowedOps { @@ -237,28 +242,37 @@ func (r *ReconcileOperationJob) getTargetsOpsStatus( } // ensureActiveDeadlineAndTTL calculate time to ActiveDeadlineSeconds and TTLSecondsAfterFinished and release targets -func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, logger logr.Logger) (bool, *time.Duration, error) { - isFailed := operationJob.Status.Progress == appsv1alpha1.OperationProgressFailed - isSucceeded := operationJob.Status.Progress == appsv1alpha1.OperationProgressSucceeded - +func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, logger logr.Logger) (bool, *time.Duration, error) { if operationJob.Spec.ActiveDeadlineSeconds != nil { - if !isFailed && !isSucceeded { - leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(operationJob.CreationTimestamp.Time) + var allowReleaseCandidates []*OpsCandidate + for i := range candidates { + candidate := candidates[i] + // just skip if target operation already finished, or not started + if IsCandidateOpsFinished(candidate) || candidate.OpsStatus.StartTime == nil { + continue + } + leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(candidate.OpsStatus.StartTime.Time) if leftTime > 0 { return false, &leftTime, nil } else { logger.Info("should end but still processing") r.Recorder.Eventf(operationJob, corev1.EventTypeNormal, "Timeout", "Try to fail OperationJob for timeout...") // mark operationjob and targets failed and release targets - ojutils.MarkOperationJobFailed(operationJob) - return false, nil, r.releaseTargets(ctx, operationJob) + MarkCandidateFailed(candidate) + allowReleaseCandidates = append(allowReleaseCandidates, candidate) } } + if len(allowReleaseCandidates) > 0 { + releaseErr := r.releaseTargets(ctx, operationJob, allowReleaseCandidates, false) + operationJob.Status = r.calculateStatus(operationJob, candidates) + updateErr := r.updateStatus(ctx, operationJob) + return false, nil, controllerutils.AggregateErrors([]error{releaseErr, updateErr}) + } } if operationJob.Spec.TTLSecondsAfterFinished != nil { - if isFailed || isSucceeded { - leftTime := time.Duration(*operationJob.Spec.TTLSecondsAfterFinished)*time.Second - time.Since(operationJob.Status.EndTimestamp.Time) + if ojutils.IsJobFinished(operationJob) { + leftTime := time.Duration(*operationJob.Spec.TTLSecondsAfterFinished)*time.Second - time.Since(operationJob.Status.EndTime.Time) if leftTime > 0 { return false, &leftTime, nil } else { @@ -269,13 +283,12 @@ func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, } } } - return false, nil, nil } // releaseTargets try to release the targets from operation when the operationJob is deleted -func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob) error { - actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, operationJob) +func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, needUpdateStatus bool) error { + actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(operationJob) if err != nil { return err } @@ -288,12 +301,14 @@ func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob releaseErr = controllerutils.AggregateErrors([]error{releaseErr, err}) } // mark candidate as failed if not finished - if IsCandidateOpsFinished(candidate) { - return nil + if !IsCandidateOpsFinished(candidate) { + candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed } - candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed return nil }) + if !needUpdateStatus { + return releaseErr + } operationJob.Status = r.calculateStatus(operationJob, candidates) updateErr := r.updateStatus(ctx, operationJob) return controllerutils.AggregateErrors([]error{releaseErr, updateErr}) diff --git a/pkg/controllers/operationjob/opscore/candidate.go b/pkg/controllers/operationjob/opscore/candidate.go index 8e8a6882..af7ecc85 100644 --- a/pkg/controllers/operationjob/opscore/candidate.go +++ b/pkg/controllers/operationjob/opscore/candidate.go @@ -88,3 +88,9 @@ func IsCandidateServiceAvailable(candidate *OpsCandidate) bool { _, serviceAvailable := candidate.Pod.Labels[appsv1alpha1.PodServiceAvailableLabel] return serviceAvailable } + +func MarkCandidateFailed(candidate *OpsCandidate) { + if candidate.OpsStatus.Progress != appsv1alpha1.OperationProgressSucceeded { + candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed + } +} diff --git a/pkg/controllers/operationjob/utils/common.go b/pkg/controllers/operationjob/utils/common.go index 64f40dba..38acdb67 100644 --- a/pkg/controllers/operationjob/utils/common.go +++ b/pkg/controllers/operationjob/utils/common.go @@ -29,7 +29,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" "kusionstack.io/kuperator/pkg/controllers/operationjob/opscore" - ctrlutils "kusionstack.io/kuperator/pkg/controllers/utils" ) const ( @@ -37,14 +36,6 @@ const ( ReasonGetObjectFailed = "GetObjectFailed" ) -func MarkOperationJobFailed(instance *appsv1alpha1.OperationJob) { - if instance.Status.Progress != appsv1alpha1.OperationProgressSucceeded { - now := ctrlutils.FormatTimeNow() - instance.Status.Progress = appsv1alpha1.OperationProgressFailed - instance.Status.EndTimestamp = &now - } -} - func MapOpsStatusByPod(instance *appsv1alpha1.OperationJob) map[string]*appsv1alpha1.OpsStatus { opsStatusMap := make(map[string]*appsv1alpha1.OpsStatus) for i, opsStatus := range instance.Status.TargetDetails {