Skip to content

Commit

Permalink
Merge pull request #966 from hzxuzhonghu/inqueue
Browse files Browse the repository at this point in the history
Record Inqueue job resource request in queueAttr
  • Loading branch information
volcano-sh-bot authored Jul 30, 2020
2 parents f0bd7db + 8fbcd7d commit 81fd056
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 21 deletions.
7 changes: 0 additions & 7 deletions pkg/scheduler/actions/enqueue/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) {

queueMap[queue.UID] = queue
queues.Push(queue)
ssn.InqueueJobResource[queue.UID] = api.EmptyResource()
}

if job.PodGroup.Status.Phase == scheduling.PodGroupPending {
Expand All @@ -78,11 +77,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) {
klog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}

if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
klog.V(3).Infof("Added Job <%s/%s> into InqueueResource", job.Namespace, job.Name)
ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources))
}
}

klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))
Expand Down Expand Up @@ -129,7 +123,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) {
if inqueue {
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
ssn.Jobs[job.UID] = job
ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources))
}

// Added Queue back until no job in Queue.
Expand Down
17 changes: 7 additions & 10 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,10 @@ type Session struct {
// This should not be mutated after initiated
podGroupStatus map[api.JobID]scheduling.PodGroupStatus

Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
Queues map[api.QueueID]*api.QueueInfo
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo
InqueueJobResource map[api.QueueID]*api.Resource
Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
Queues map[api.QueueID]*api.QueueInfo
NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo

Tiers []conf.Tier
Configurations []conf.Configuration
Expand Down Expand Up @@ -82,10 +81,9 @@ func openSession(cache cache.Cache) *Session {

podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{},

Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},
InqueueJobResource: map[api.QueueID]*api.Resource{},
Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},

plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
Expand Down Expand Up @@ -157,7 +155,6 @@ func closeSession(ssn *Session) {
ssn.jobOrderFns = nil
ssn.namespaceOrderFns = nil
ssn.queueOrderFns = nil
ssn.InqueueJobResource = nil

klog.V(3).Infof("Close Session %v", ssn.UID)
}
Expand Down
19 changes: 15 additions & 4 deletions pkg/scheduler/plugins/proportion/proportion.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package proportion

import (
"k8s.io/klog"

"volcano.sh/volcano/pkg/apis/scheduling"
"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/api/helpers"
"volcano.sh/volcano/pkg/scheduler/framework"
Expand All @@ -43,6 +45,8 @@ type queueAttr struct {
deserved *api.Resource
allocated *api.Resource
request *api.Resource
// inqueue represents the resource request of the inqueue job
inqueue *api.Resource
}

// New return proportion action
Expand All @@ -69,7 +73,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
// Build attributes for Queues.
for _, job := range ssn.Jobs {
klog.V(4).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name)

if _, found := pp.queueOpts[job.Queue]; !found {
queue := ssn.Queues[job.Queue]
attr := &queueAttr{
Expand All @@ -80,25 +83,29 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {
deserved: api.EmptyResource(),
allocated: api.EmptyResource(),
request: api.EmptyResource(),
inqueue: api.EmptyResource(),
}
pp.queueOpts[job.Queue] = attr
klog.V(4).Infof("Added Queue <%s> attributes.", job.Queue)
}

attr := pp.queueOpts[job.Queue]
for status, tasks := range job.TaskStatusIndex {
if api.AllocatedStatus(status) {
for _, t := range tasks {
attr := pp.queueOpts[job.Queue]
attr.allocated.Add(t.Resreq)
attr.request.Add(t.Resreq)
}
} else if status == api.Pending {
for _, t := range tasks {
attr := pp.queueOpts[job.Queue]
attr.request.Add(t.Resreq)
}
}
}

if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue {
attr.inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
}
}

// Record metrics
Expand Down Expand Up @@ -242,7 +249,11 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) {

minReq := api.NewResource(*job.PodGroup.Spec.MinResources)
// The queue resource quota limit has not reached
return minReq.Add(attr.allocated).Add(ssn.InqueueJobResource[job.Queue]).LessEqual(api.NewResource(queue.Queue.Spec.Capability))
inqueue := minReq.Add(attr.allocated).Add(attr.inqueue).LessEqual(api.NewResource(queue.Queue.Spec.Capability))
if inqueue {
attr.inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources))
}
return inqueue
})

// Register event handlers.
Expand Down

0 comments on commit 81fd056

Please sign in to comment.