Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

add job priority for kube-batch scheduling #141 #45

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions job_controller/api/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,8 @@ type RunPolicy struct {
// job, for example `minAvailable` for gang-scheduling.
type SchedulingPolicy struct {
MinAvailable *int32 `json:"minAvailable,omitempty"`

//add PriorityClassName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this line

//PriorityClassName is a type of k8s resource.(kubectl get priorityclass)
PriorityClassName *string `json:"priorityClassName,omitempty"`
}
23 changes: 22 additions & 1 deletion job_controller/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (jc *JobController) deletePodsAndServices(runPolicy *apiv1.RunPolicy, job i
}
return nil
}

// TTL means time to live
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line, too.

func (jc *JobController) cleanupJobIfTTL(runPolicy *apiv1.RunPolicy, jobStatus apiv1.JobStatus, job interface{}) error {
currentTime := time.Now()
metaObject, _ := job.(metav1.Object)
Expand Down Expand Up @@ -191,6 +191,16 @@ func (jc *JobController) ReconcileJobs(
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}

if jc.Config.EnableGangScheduling {
minAvailableReplicas := getTotalReplicas(replicas)
priorityClassName := getPriorityClassName(runPolicy)
//_, err := pc.SyncPodGroup(job, minAvailableReplicas)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why keep this ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reference the code about EnableGangScheduling of other operators and add the priority‘s’attribute and function.

_, err := jc.SyncPodGroup(metaObject, minAvailableReplicas, priorityClassName)
if err != nil {
log.Warnf("Sync PodGroup %v: %v", jobName, err)
}
}

// Save the current state of the replicas
replicasStatus := make(map[string]v1.PodPhase)

Expand Down Expand Up @@ -299,3 +309,14 @@ func (jc *JobController) cleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.
jc.WorkQueue.AddRateLimited(key)
return nil
}
func getPriorityClassName(runPolicy *apiv1.RunPolicy) string {
priorityClassName := *runPolicy.SchedulingPolicy.PriorityClassName
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Runtime error here

return priorityClassName
}
func getTotalReplicas(replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) int32 {
jobReplicas := int32(0)
for _, r := range replicas {
jobReplicas += *r.Replicas
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should check if r.Replicas is nil here, too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got it

}
return jobReplicas
}
3 changes: 2 additions & 1 deletion job_controller/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (jc *JobController) GenLabels(jobName string) map[string]string {
}
}

func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32) (*v1alpha1.PodGroup, error) {
func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas int32,priorityClassName string) (*v1alpha1.PodGroup, error) {

kubeBatchClientInterface := jc.KubeBatchClientSet
// Check whether podGroup exists or not
Expand All @@ -68,6 +68,7 @@ func (jc *JobController) SyncPodGroup(job metav1.Object, minAvailableReplicas in
},
Spec: v1alpha1.PodGroupSpec{
MinMember: minAvailable.IntVal,
PriorityClassName: priorityClassName,
},
}
return kubeBatchClientInterface.SchedulingV1alpha1().PodGroups(job.GetNamespace()).Create(createPodGroup)
Expand Down