Skip to content

Commit

Permalink
refactor opj active deadline seconds
Browse files Browse the repository at this point in the history
  • Loading branch information
ColdsteelRail committed Sep 27, 2024
1 parent f4ff0e8 commit 58ed2ee
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 38 deletions.
6 changes: 5 additions & 1 deletion charts/templates/crd/apps.kusionstack.io_operationjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spec:
activeDeadlineSeconds:
description: |-
Specify the duration in seconds relative to the startTime
that the job may be active before the system tries to terminate it
that the target may be active before the system tries to terminate it
format: int32
type: integer
operationDelaySeconds:
Expand Down Expand Up @@ -143,6 +143,10 @@ spec:
progress:
description: operation progress of target pod
type: string
startTimestamp:
description: target operation start time
format: date-time
type: string
type: object
type: array
totalPodCount:
Expand Down
6 changes: 5 additions & 1 deletion config/crd/bases/apps.kusionstack.io_operationjobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ spec:
activeDeadlineSeconds:
description: |-
Specify the duration in seconds relative to the startTime
that the job may be active before the system tries to terminate it
that the target may be active before the system tries to terminate it
format: int32
type: integer
operationDelaySeconds:
Expand Down Expand Up @@ -143,6 +143,10 @@ spec:
progress:
description: operation progress of target pod
type: string
startTimestamp:
description: target operation start time
format: date-time
type: string
type: object
type: array
totalPodCount:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ require (
k8s.io/apiserver v0.22.6 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/kubectl v0.29.0
kusionstack.io/kube-api v0.6.0
kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,8 @@ k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSn
k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
kusionstack.io/kube-api v0.6.0 h1:FSvvZvhpAul4mnCQXYI61dIQ8QT5txdPWNwWXdjJHME=
kusionstack.io/kube-api v0.6.0/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e h1:kgLQN99sgGl5vjCzDYqAhAxSh/1K2/W5FW9ba7oYr/U=
kusionstack.io/kube-api v0.6.1-0.20240923062820-d1e1f0ffca8e/go.mod h1:J0+EHiroG/88X904Y9TV9iMRcoEuD5tXMTLMBDSwM+Y=
kusionstack.io/resourceconsist v0.0.1 h1:+k/jriq5Ld7fQUYfWSMGynz/FesHtl3Rk2fmQPjBe0g=
kusionstack.io/resourceconsist v0.0.1/go.mod h1:816xS/fY6EOUbPFjXIWW/TGs8/YE46qP4ElKeIiwFdU=
modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw=
Expand Down
36 changes: 19 additions & 17 deletions pkg/controllers/operationjob/operationjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,14 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req
return reconcile.Result{Requeue: true}, nil
}

//actionHandler, enablePodOpsLifecycle := r.getActionHandler(instance)
candidates, err := r.listTargets(ctx, instance)
if err != nil {
return reconcile.Result{}, err
}

if instance.DeletionTimestamp != nil {
if err := r.releaseTargets(ctx, instance); err != nil {
if err := r.releaseTargets(ctx, instance, candidates); err != nil {
r.Recorder.Eventf(instance, corev1.EventTypeWarning, "ReleaseTargetFailed", fmt.Sprintf("failed to release targets when job deleting: %s", err.Error()))
return reconcile.Result{}, err
}
Expand All @@ -128,27 +134,17 @@ func (r *ReconcileOperationJob) Reconcile(ctx context.Context, req reconcile.Req
return reconcile.Result{}, err
}

jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, logger)
jobDeleted, requeueAfter, err := r.ensureActiveDeadlineAndTTL(ctx, instance, candidates, logger)
if jobDeleted || err != nil {
return reconcile.Result{}, err
}

reconcileErr := r.doReconcile(ctx, instance)
updateErr := r.updateStatus(ctx, instance)
return requeueResult(requeueAfter), ctrlutils.AggregateErrors([]error{reconcileErr, updateErr})
}

func (r *ReconcileOperationJob) getActionHandlerAndTargets(ctx context.Context, instance *appsv1alpha1.OperationJob) (
actionHandler ActionHandler, enablePodOpsLifecycle bool, candidates []*OpsCandidate, err error) {
if actionHandler, enablePodOpsLifecycle, err = r.getActionHandler(instance); err != nil {
return
}
candidates, err = r.listTargets(ctx, instance)
return
err = r.doReconcile(ctx, instance, candidates)
return requeueResult(requeueAfter), err
}

func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob) error {
actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, instance)
func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) error {
actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(instance)
if err != nil {
return err
}
Expand All @@ -160,7 +156,9 @@ func (r *ReconcileOperationJob) doReconcile(ctx context.Context, instance *appsv
getErr := r.getTargetsOpsStatus(ctx, actionHandler, selectedCandidates, enablePodOpsLifecycle, instance)
// calculate opsStatus of all candidates
instance.Status = r.calculateStatus(instance, candidates)
return controllerutils.AggregateErrors([]error{opsErr, getErr})
// update operationjob status
updateErr := r.updateStatus(ctx, instance)
return controllerutils.AggregateErrors([]error{opsErr, getErr, updateErr})
}

func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.OperationJob, candidates []*OpsCandidate) (jobStatus appsv1alpha1.OperationJobStatus) {
Expand Down Expand Up @@ -197,6 +195,10 @@ func (r *ReconcileOperationJob) calculateStatus(instance *appsv1alpha1.Operation
if !ojutils.IsJobFinished(&appsv1alpha1.OperationJob{Status: jobStatus}) {
jobStatus.Progress = appsv1alpha1.OperationProgressProcessing

if jobStatus.StartTimestamp == nil {
jobStatus.EndTimestamp = &now
}

if pendingPodCount == totalPodCount {
jobStatus.Progress = appsv1alpha1.OperationProgressPending
}
Expand Down
38 changes: 32 additions & 6 deletions pkg/controllers/operationjob/operationjob_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ var _ = Describe("operationjob controller", func() {
It("deadline and ttl", func() {
testcase := "test-deadline-ttl"
Expect(createNamespace(c, testcase)).Should(BeNil())
cs := createCollaSetWithReplicas("foo", testcase, 2)
cs := createCollaSetWithReplicas("foo", testcase, 3)
podNames := getPodNamesFromCollaSet(cs)

oj := &appsv1alpha1.OperationJob{
Expand All @@ -444,24 +444,38 @@ var _ = Describe("operationjob controller", func() {
Name: "foo",
},
Spec: appsv1alpha1.OperationJobSpec{
Action: appsv1alpha1.OpsActionReplace,
Partition: int32Pointer(0),
Action: appsv1alpha1.OpsActionReplace,
Targets: []appsv1alpha1.PodOpsTarget{
{
Name: podNames[0],
},
{
Name: podNames[1],
},
{
Name: podNames[2],
},
},
ActiveDeadlineSeconds: int32Pointer(5),
TTLSecondsAfterFinished: int32Pointer(10),
ActiveDeadlineSeconds: int32Pointer(3),
TTLSecondsAfterFinished: int32Pointer(5),
},
}

Expect(c.Create(ctx, oj)).Should(BeNil())

// wait for replace failed after ActiveDeadlineSeconds
assertJobProgressFailed(oj, time.Second*10)
for _, partition := range []int32{0, 1, 2, 3} {
// update partition
Eventually(func() error {
return c.Get(context.TODO(), types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj)
}, time.Second*5, time.Second).Should(BeNil())
Expect(updateOperationJobWithRetry(oj.Namespace, oj.Name, func(job *appsv1alpha1.OperationJob) bool {
job.Spec.Partition = &partition
return true
})).Should(BeNil())
// wait for replace failed after ActiveDeadlineSeconds
assertFailedReplicas(oj, partition, time.Second*10)
}

// wait for operationJob deleted after TTL
Eventually(func() bool {
Expand All @@ -477,6 +491,18 @@ var _ = Describe("operationjob controller", func() {

})

func assertFailedReplicas(oj *appsv1alpha1.OperationJob, failedPodCount int32, timeout time.Duration) {
Eventually(func() bool {
err := c.Get(ctx, types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj)
if errors.IsNotFound(err) {
return false
} else {
Expect(err).Should(BeNil())
}
return oj.Status.FailedPodCount == failedPodCount
}, timeout, time.Second).Should(BeTrue())
}

func assertSucceededReplicas(oj *appsv1alpha1.OperationJob, succeededPodCount int32, timeout time.Duration) {
Eventually(func() bool {
err := c.Get(ctx, types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, oj)
Expand Down
34 changes: 22 additions & 12 deletions pkg/controllers/operationjob/operationjob_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"kusionstack.io/kuperator/pkg/controllers/operationjob/replace"
ojutils "kusionstack.io/kuperator/pkg/controllers/operationjob/utils"
controllerutils "kusionstack.io/kuperator/pkg/controllers/utils"
ctrlutils "kusionstack.io/kuperator/pkg/controllers/utils"
"kusionstack.io/kuperator/pkg/controllers/utils/podopslifecycle"
)

Expand Down Expand Up @@ -149,6 +150,10 @@ func (r *ReconcileOperationJob) filterAndOperateAllowOpsTargets(
} else {
candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressProcessing
}
if candidate.OpsStatus.StartTimestamp == nil {
now := ctrlutils.FormatTimeNow()
candidate.OpsStatus.StartTimestamp = &now
}
}

if isAllowedOps {
Expand Down Expand Up @@ -237,27 +242,33 @@ func (r *ReconcileOperationJob) getTargetsOpsStatus(
}

// ensureActiveDeadlineAndTTL calculate time to ActiveDeadlineSeconds and TTLSecondsAfterFinished and release targets
func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, logger logr.Logger) (bool, *time.Duration, error) {
isFailed := operationJob.Status.Progress == appsv1alpha1.OperationProgressFailed
isSucceeded := operationJob.Status.Progress == appsv1alpha1.OperationProgressSucceeded

func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate, logger logr.Logger) (bool, *time.Duration, error) {
if operationJob.Spec.ActiveDeadlineSeconds != nil {
if !isFailed && !isSucceeded {
leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(operationJob.CreationTimestamp.Time)
var allowReleaseCandidates []*OpsCandidate
for i := range candidates {
candidate := candidates[i]
// just skip if target operation already finished, or not started
if IsCandidateOpsFinished(candidate) || candidate.OpsStatus.StartTimestamp == nil {
continue
}
leftTime := time.Duration(*operationJob.Spec.ActiveDeadlineSeconds)*time.Second - time.Since(candidate.OpsStatus.StartTimestamp.Time)
if leftTime > 0 {
return false, &leftTime, nil
} else {
logger.Info("should end but still processing")
r.Recorder.Eventf(operationJob, corev1.EventTypeNormal, "Timeout", "Try to fail OperationJob for timeout...")
// mark operationjob and targets failed and release targets
ojutils.MarkOperationJobFailed(operationJob)
return false, nil, r.releaseTargets(ctx, operationJob)
MarkCandidateFailed(candidate)
allowReleaseCandidates = append(allowReleaseCandidates, candidate)
}
}
if len(allowReleaseCandidates) > 0 {
return false, nil, r.releaseTargets(ctx, operationJob, allowReleaseCandidates)
}
}

if operationJob.Spec.TTLSecondsAfterFinished != nil {
if isFailed || isSucceeded {
if ojutils.IsJobFinished(operationJob) {
leftTime := time.Duration(*operationJob.Spec.TTLSecondsAfterFinished)*time.Second - time.Since(operationJob.Status.EndTimestamp.Time)
if leftTime > 0 {
return false, &leftTime, nil
Expand All @@ -269,13 +280,12 @@ func (r *ReconcileOperationJob) ensureActiveDeadlineAndTTL(ctx context.Context,
}
}
}

return false, nil, nil
}

// releaseTargets try to release the targets from operation when the operationJob is deleted
func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob) error {
actionHandler, enablePodOpsLifecycle, candidates, err := r.getActionHandlerAndTargets(ctx, operationJob)
func (r *ReconcileOperationJob) releaseTargets(ctx context.Context, operationJob *appsv1alpha1.OperationJob, candidates []*OpsCandidate) error {
actionHandler, enablePodOpsLifecycle, err := r.getActionHandler(operationJob)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/controllers/operationjob/opscore/candidate.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,9 @@ func IsCandidateServiceAvailable(candidate *OpsCandidate) bool {
_, serviceAvailable := candidate.Pod.Labels[appsv1alpha1.PodServiceAvailableLabel]
return serviceAvailable
}

func MarkCandidateFailed(candidate *OpsCandidate) {
if candidate.OpsStatus.Progress != appsv1alpha1.OperationProgressSucceeded {
candidate.OpsStatus.Progress = appsv1alpha1.OperationProgressFailed
}
}

0 comments on commit 58ed2ee

Please sign in to comment.