Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#417 from chenyangxueHDU/master
Browse files Browse the repository at this point in the history
fix BestEffort for overused
  • Loading branch information
k8s-ci-robot authored Oct 12, 2018
2 parents 1bfc626 + 54c4305 commit 5ae939f
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 0 deletions.
32 changes: 32 additions & 0 deletions pkg/scheduler/plugins/priority/priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/golang/glog"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/framework"

"k8s.io/api/core/v1"
)

type priorityPlugin struct {
Expand All @@ -37,6 +39,11 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
lv := l.(*api.TaskInfo)
rv := r.(*api.TaskInfo)

// TODO quick fix, enhance it later by other option.
if res := compareQoS(lv.Pod, rv.Pod); res != 0 {
return res
}

glog.V(3).Infof("Priority TaskOrder: <%v/%v> prority is %v, <%v/%v> priority is %v",
lv.Namespace, lv.Name, lv.Priority, rv.Namespace, rv.Name, rv.Priority)

Expand Down Expand Up @@ -81,3 +88,28 @@ func (pp *priorityPlugin) OnSessionOpen(ssn *framework.Session) {
}

func (pp *priorityPlugin) OnSessionClose(ssn *framework.Session) {}

// make BestEffort > Burstable/Guarantee
func compareQoS(l, r *v1.Pod) int {
lq := l.Status.QOSClass
rq := r.Status.QOSClass

glog.V(3).Infof("Priority TaskOrder: <%v/%v> QoS is %v, <%v/%v> QoS is %v",
l.Namespace, l.Name, lq, r.Namespace, r.Name, rq)

if lq == rq {
return 0
}

// BestEffort > Burstable/Guarantee
if lq == v1.PodQOSBestEffort {
return -1
}

// Burstable/Guarantee < BestEffort
if rq == v1.PodQOSBestEffort {
return 1
}

return 0
}
13 changes: 13 additions & 0 deletions test/e2e/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,18 @@ var _ = Describe("Job E2E Test", func() {
err = waitTasksReady(context, job3.Name, int(rep)/3)
Expect(err).NotTo(HaveOccurred())
})
It("Schedule BestEffort Job", func() {
context := initTestContext()
defer cleanupTestContext(context)

pgName := "test"

createPodGroup(context, "burstable", pgName, 2)

bestEffort := createJobWithoutPodGroup(context, "besteffort", 1, "busybox", nil, pgName)
burstable := createJobWithoutPodGroup(context, "burstable", 1, "busybox", oneCPU, pgName)

err := waitJobReadyWithPodGroup(context, pgName, bestEffort.Name, burstable.Name)
Expect(err).NotTo(HaveOccurred())
})
})
101 changes: 101 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,3 +620,104 @@ func getPodOfJob(ctx *context, jobName string) []*v1.Pod {

return qjpod
}

func createPodGroup(
context *context,
jobName, name string,
min int32,
) {
jns, _, _ := splictJobName(context, jobName)

pg := &arbv1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: jns,
},
Spec: arbv1.PodGroupSpec{
MinMember: min,
},
}

pg, err := context.karclient.Scheduling().PodGroups(jns).Create(pg)
Expect(err).NotTo(HaveOccurred())
}

func createJobWithoutPodGroup(
context *context,
name string,
rep int32,
img string,
req v1.ResourceList,
pg string,
) *batchv1.Job {
containers := createContainers(img, req, 0)
jns, jn, _ := splictJobName(context, name)

job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: jn,
Namespace: jns,
},
Spec: batchv1.JobSpec{
Parallelism: &rep,
Completions: &rep,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{arbv1.GroupNameAnnotationKey: pg},
},
Spec: v1.PodSpec{
SchedulerName: "kube-batch",
RestartPolicy: v1.RestartPolicyNever,
Containers: containers,
},
},
},
}

job, err := context.kubeclient.BatchV1().Jobs(job.Namespace).Create(job)
Expect(err).NotTo(HaveOccurred())

return job
}

func waitJobReadyWithPodGroup(ctx *context, pgName string, jobNames ...string) error {
return wait.Poll(100*time.Millisecond, oneMinute, taskReadyWithPodGroup(ctx, -1, pgName, jobNames))
}

func taskReadyWithPodGroup(ctx *context, taskNum int, pgName string, jobNames []string) wait.ConditionFunc {

return func() (bool, error) {
pg := &arbv1.PodGroup{}

readyTaskNum := 0
for _, name := range jobNames {
jns, jn, _ := splictJobName(ctx, name)

queueJob, err := ctx.kubeclient.BatchV1().Jobs(jns).Get(jn, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())

pods, err := ctx.kubeclient.CoreV1().Pods(jns).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())

for _, pod := range pods.Items {
labelSelector := labels.SelectorFromSet(queueJob.Spec.Selector.MatchLabels)
if !labelSelector.Matches(labels.Set(pod.Labels)) ||
!metav1.IsControlledBy(&pod, queueJob) {
continue
}
if pod.Status.Phase == v1.PodRunning || pod.Status.Phase == v1.PodSucceeded {
readyTaskNum++
}
}

pg, err = ctx.karclient.Scheduling().PodGroups(jns).Get(pgName, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
}

if taskNum < 0 {
taskNum = int(pg.Spec.MinMember)
}

return taskNum <= readyTaskNum, nil
}
}

0 comments on commit 5ae939f

Please sign in to comment.