Skip to content

Commit

Permalink
Merge pull request #960 from hzxuzhonghu/automated-cherry-pick-of-#94…
Browse files Browse the repository at this point in the history
…5-origin-release-0.4

[Cherry-pick] Support job scale down to zero
  • Loading branch information
volcano-sh-bot authored Jul 30, 2020
2 parents b94e093 + 31a9d28 commit 3b0b818
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 11 deletions.
7 changes: 6 additions & 1 deletion pkg/controllers/job/state/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ func (ps *runningState) Execute(action v1alpha1.Action) error {
})
default:
return SyncJob(ps.job, func(status *vcbatch.JobStatus) bool {
if status.Succeeded+status.Failed == TotalTasks(ps.job.Job) {
jobReplicas := TotalTasks(ps.job.Job)
if jobReplicas == 0 {
// when scale down to zero, keep the current job phase
return false
}
if status.Succeeded+status.Failed == jobReplicas {
if status.Succeeded >= ps.job.Job.Spec.MinAvailable {
status.State.Phase = vcbatch.Completed
} else {
Expand Down
8 changes: 4 additions & 4 deletions pkg/webhooks/admission/jobs/validate/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,9 @@ func validateJobCreate(job *v1alpha1.Job, reviewResponse *v1beta1.AdmissionRespo
taskNames := map[string]string{}
var totalReplicas int32

if job.Spec.MinAvailable <= 0 {
if job.Spec.MinAvailable < 0 {
reviewResponse.Allowed = false
return fmt.Sprintf("'minAvailable' must be > 0.")
return fmt.Sprintf("'minAvailable' must be >= 0.")
}

if job.Spec.MaxRetry < 0 {
Expand Down Expand Up @@ -212,8 +212,8 @@ func validateJobUpdate(old, new *v1alpha1.Job) error {
if new.Spec.MinAvailable > totalReplicas {
return fmt.Errorf("'minAvailable' must not be greater than total replicas")
}
if new.Spec.MinAvailable <= 0 {
return fmt.Errorf("'minAvailable' must be > 0")
if new.Spec.MinAvailable < 0 {
return fmt.Errorf("'minAvailable' must be >= 0")
}

if len(old.Spec.Tasks) != len(new.Spec.Tasks) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/webhooks/admission/jobs/validate/admit_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func TestValidateJobCreate(t *testing.T) {
Namespace: namespace,
},
Spec: v1alpha1.JobSpec{
MinAvailable: 0,
MinAvailable: -1,
Queue: "default",
Tasks: []v1alpha1.TaskSpec{
{
Expand All @@ -328,7 +328,7 @@ func TestValidateJobCreate(t *testing.T) {
},
},
reviewResponse: v1beta1.AdmissionResponse{Allowed: false},
ret: "'minAvailable' must be > 0",
ret: "'minAvailable' must be >= 0",
ExpectErr: true,
},
// maxretry less than zero
Expand Down Expand Up @@ -1120,7 +1120,7 @@ func TestValidateJobUpdate(t *testing.T) {
{
name: "invalid minAvailable",
replicas: 4,
minAvailable: 0,
minAvailable: -1,
addTask: false,
mutateTaskName: false,
mutateSpec: false,
Expand Down
73 changes: 72 additions & 1 deletion test/e2e/job_scale_up_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ var _ = Describe("Dynamic Job scale up and down", func() {
err := waitJobReady(context, job)
Expect(err).NotTo(HaveOccurred())

// scale up
// scale down
job.Spec.MinAvailable = 1
job.Spec.Tasks[0].Replicas = 1
err = updateJob(context, job)
Expand Down Expand Up @@ -142,4 +142,75 @@ var _ = Describe("Dynamic Job scale up and down", func() {

})

It("Scale down to zero and scale up", func() {
By("init test ctx")
ctx := initTestContext(options{})
defer cleanupTestContext(ctx)

jobName := "scale-down-job"
By("create job")
job := createJob(ctx, &jobSpec{
name: jobName,
plugins: map[string][]string{
"svc": {},
},
tasks: []taskSpec{
{
name: "default",
img: defaultNginxImage,
min: 2,
rep: 2,
req: halfCPU,
},
},
})

// job phase: pending -> running
err := waitJobReady(ctx, job)
Expect(err).NotTo(HaveOccurred())

// scale down
job.Spec.MinAvailable = 0
job.Spec.Tasks[0].Replicas = 0
err = updateJob(ctx, job)
Expect(err).NotTo(HaveOccurred())

// wait for tasks scaled up
err = waitJobReady(ctx, job)
Expect(err).NotTo(HaveOccurred())

// check configmap updated
pluginName := fmt.Sprintf("%s-svc", jobName)
cm, err := ctx.kubeclient.CoreV1().ConfigMaps(ctx.namespace).Get(pluginName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

hosts := svc.GenerateHosts(job)
Expect(hosts).To(Equal(cm.Data))

// scale up
job.Spec.MinAvailable = 2
job.Spec.Tasks[0].Replicas = 2
err = updateJob(ctx, job)
Expect(err).NotTo(HaveOccurred())

// wait for tasks scaled up
err = waitJobReady(ctx, job)
Expect(err).NotTo(HaveOccurred())

// check configmap updated
cm, err = ctx.kubeclient.CoreV1().ConfigMaps(ctx.namespace).Get(pluginName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

hosts = svc.GenerateHosts(job)
Expect(hosts).To(Equal(cm.Data))

// TODO: check others

By("delete job")
err = ctx.vcclient.BatchV1alpha1().Jobs(job.Namespace).Delete(job.Name, nil)
Expect(err).NotTo(HaveOccurred())

err = waitJobCleanedUp(ctx, job)
Expect(err).NotTo(HaveOccurred())
})
})
5 changes: 3 additions & 2 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,9 @@ func updateJob(context *context, job *batchv1alpha1.Job) error {
if err != nil {
return err
}
patchBytes := []byte(fmt.Sprintf(`{"spec":%s}`, spec))
_, err = context.vcclient.BatchV1alpha1().Jobs(job.Namespace).Patch(job.Name, types.MergePatchType, patchBytes)
patch := fmt.Sprintf(`[{"op": "replace", "path": "/spec", "value":%s}]`, spec)
patchBytes := []byte(patch)
_, err = context.vcclient.BatchV1alpha1().Jobs(job.Namespace).Patch(job.Name, types.JSONPatchType, patchBytes)
return err
}

Expand Down

0 comments on commit 3b0b818

Please sign in to comment.