Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

fix BestEffort for overused #417

Merged
merged 3 commits into from
Oct 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pkg/scheduler/plugins/priority/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/golang/glog"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"

"k8s.io/api/core/v1"
)

type priorityPlugin struct {
Expand All @@ -37,6 +39,11 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
lv := l.(*api.TaskInfo)
rv := r.(*api.TaskInfo)

// TODO quick fix, enhance it later by other option.
if res := compareQoS(lv.Pod, rv.Pod); res != 0 {
return res
}

glog.V(3).Infof("Priority TaskOrder: <%v/%v> prority is %v, <%v/%v> priority is %v",
lv.Namespace, lv.Name, lv.Priority, rv.Namespace, rv.Name, rv.Priority)

Expand Down Expand Up @@ -81,3 +88,28 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
}

func (pp *priorityPlugin) OnSessionClose(ssn *framework.Session) {}

// make BestEffort > Burstable/Guarantee
func compareQoS(l, r *v1.Pod) int {
lq := l.Status.QOSClass
rq := r.Status.QOSClass

glog.V(3).Infof("Priority TaskOrder: <%v/%v> QoS is %v, <%v/%v> QoS is %v",
l.Namespace, l.Name, lq, r.Namespace, r.Name, rq)

if lq == rq {
return 0
}

// BestEffort > Burstable/Guarantee
if lq == v1.PodQOSBestEffort {
return -1
}

// Burstable/Guarantee < BestEffort
if rq == v1.PodQOSBestEffort {
return 1
}

return 0
}
13 changes: 13 additions & 0 deletions test/e2e/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,18 @@ var _ = Describe("Job E2E Test", func() {
err = waitTasksReady(context, job3.Name, int(rep)/3)
Expect(err).NotTo(HaveOccurred())
})
It("Schedule BestEffort Job", func() {
context := initTestContext()
defer cleanupTestContext(context)

pgName := "test"

createPodGroup(context, "burstable", pgName, 2)

bestEffort := createJobWithoutPodGroup(context, "besteffort", 1, "busybox", nil, pgName)
burstable := createJobWithoutPodGroup(context, "burstable", 1, "busybox", oneCPU, pgName)

err := waitJobReadyWithPodGroup(context, pgName, bestEffort.Name, burstable.Name)
Expect(err).NotTo(HaveOccurred())
})
})
101 changes: 101 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,104 @@ func getPodOfJob(ctx *context, jobName string) []*v1.Pod {

return qjpod
}

func createPodGroup(
context *context,
jobName, name string,
min int32,
) {
jns, _, _ := splictJobName(context, jobName)

pg := &arbv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: jns,
},
Spec: arbv1.PodGroupSpec{
MinMember: min,
},
}

pg, err := context.karclient.Scheduling().PodGroups(jns).Create(pg)
Expect(err).NotTo(HaveOccurred())
}

func createJobWithoutPodGroup(
context *context,
name string,
rep int32,
img string,
req v1.ResourceList,
pg string,
) *batchv1.Job {
containers := createContainers(img, req, 0)
jns, jn, _ := splictJobName(context, name)

job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jn,
Namespace: jns,
},
Spec: batchv1.JobSpec{
Parallelism: &rep,
Completions: &rep,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{arbv1.GroupNameAnnotationKey: pg},
},
Spec: v1.PodSpec{
SchedulerName: "kube-batch",
RestartPolicy: v1.RestartPolicyNever,
Containers: containers,
},
},
},
}

job, err := context.kubeclient.BatchV1().Jobs(job.Namespace).Create(job)
Expect(err).NotTo(HaveOccurred())

return job
}

func waitJobReadyWithPodGroup(ctx *context, pgName string, jobNames ...string) error {
return wait.Poll(100*time.Millisecond, oneMinute, taskReadyWithPodGroup(ctx, -1, pgName, jobNames))
}

func taskReadyWithPodGroup(ctx *context, taskNum int, pgName string, jobNames []string) wait.ConditionFunc {

return func() (bool, error) {
pg := &arbv1.PodGroup{}

readyTaskNum := 0
for _, name := range jobNames {
jns, jn, _ := splictJobName(ctx, name)

queueJob, err := ctx.kubeclient.BatchV1().Jobs(jns).Get(jn, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

pods, err := ctx.kubeclient.CoreV1().Pods(jns).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

for _, pod := range pods.Items {
labelSelector := labels.SelectorFromSet(queueJob.Spec.Selector.MatchLabels)
if !labelSelector.Matches(labels.Set(pod.Labels)) ||
!metav1.IsControlledBy(&pod, queueJob) {
continue
}
if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodSucceeded {
readyTaskNum++
}
}

pg, err = ctx.karclient.Scheduling().PodGroups(jns).Get(pgName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
}

if taskNum < 0 {
taskNum = int(pg.Spec.MinMember)
}

return taskNum <= readyTaskNum, nil
}
}