diff --git a/pkg/scheduler/actions/enqueue/enqueue.go b/pkg/scheduler/actions/enqueue/enqueue.go index aa1929ab40..408c64004d 100644 --- a/pkg/scheduler/actions/enqueue/enqueue.go +++ b/pkg/scheduler/actions/enqueue/enqueue.go @@ -68,7 +68,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) { queueMap[queue.UID] = queue queues.Push(queue) - ssn.InqueueJobResource[queue.UID] = api.EmptyResource() } if job.PodGroup.Status.Phase == scheduling.PodGroupPending { @@ -78,11 +77,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) { klog.V(3).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) jobsMap[job.Queue].Push(job) } - - if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { - klog.V(3).Infof("Added Job <%s/%s> into InqueueResource", job.Namespace, job.Name) - ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) - } } klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap)) @@ -129,7 +123,6 @@ func (enqueue *Action) Execute(ssn *framework.Session) { if inqueue { job.PodGroup.Status.Phase = scheduling.PodGroupInqueue ssn.Jobs[job.UID] = job - ssn.InqueueJobResource[job.Queue].Add(api.NewResource(*job.PodGroup.Spec.MinResources)) } // Added Queue back until no job in Queue. diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index efc6bff483..c83e8e4757 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -44,11 +44,10 @@ type Session struct { // This should not be mutated after initiated podGroupStatus map[api.JobID]scheduling.PodGroupStatus - Jobs map[api.JobID]*api.JobInfo - Nodes map[string]*api.NodeInfo - Queues map[api.QueueID]*api.QueueInfo - NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo - InqueueJobResource map[api.QueueID]*api.Resource + Jobs map[api.JobID]*api.JobInfo + Nodes map[string]*api.NodeInfo + Queues map[api.QueueID]*api.QueueInfo + NamespaceInfo map[api.NamespaceName]*api.NamespaceInfo Tiers []conf.Tier Configurations []conf.Configuration @@ -82,10 +81,9 @@ func openSession(cache cache.Cache) *Session { podGroupStatus: map[api.JobID]scheduling.PodGroupStatus{}, - Jobs: map[api.JobID]*api.JobInfo{}, - Nodes: map[string]*api.NodeInfo{}, - Queues: map[api.QueueID]*api.QueueInfo{}, - InqueueJobResource: map[api.QueueID]*api.Resource{}, + Jobs: map[api.JobID]*api.JobInfo{}, + Nodes: map[string]*api.NodeInfo{}, + Queues: map[api.QueueID]*api.QueueInfo{}, plugins: map[string]Plugin{}, jobOrderFns: map[string]api.CompareFn{}, @@ -157,7 +155,6 @@ func closeSession(ssn *Session) { ssn.jobOrderFns = nil ssn.namespaceOrderFns = nil ssn.queueOrderFns = nil - ssn.InqueueJobResource = nil klog.V(3).Infof("Close Session %v", ssn.UID) } diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index f4a5279cde..5c39acf3fc 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -18,6 +18,8 @@ package proportion import ( "k8s.io/klog" + + "volcano.sh/volcano/pkg/apis/scheduling" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/api/helpers" "volcano.sh/volcano/pkg/scheduler/framework" @@ -43,6 +45,8 @@ type queueAttr struct { deserved *api.Resource allocated *api.Resource request *api.Resource + // inqueue represents the resource request of the inqueue job + inqueue *api.Resource } // New return proportion action @@ -69,7 +73,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { // Build attributes for Queues. for _, job := range ssn.Jobs { klog.V(4).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name) - if _, found := pp.queueOpts[job.Queue]; !found { queue := ssn.Queues[job.Queue] attr := &queueAttr{ @@ -80,25 +83,29 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { deserved: api.EmptyResource(), allocated: api.EmptyResource(), request: api.EmptyResource(), + inqueue: api.EmptyResource(), } pp.queueOpts[job.Queue] = attr klog.V(4).Infof("Added Queue <%s> attributes.", job.Queue) } + attr := pp.queueOpts[job.Queue] for status, tasks := range job.TaskStatusIndex { if api.AllocatedStatus(status) { for _, t := range tasks { - attr := pp.queueOpts[job.Queue] attr.allocated.Add(t.Resreq) attr.request.Add(t.Resreq) } } else if status == api.Pending { for _, t := range tasks { - attr := pp.queueOpts[job.Queue] attr.request.Add(t.Resreq) } } } + + if job.PodGroup.Status.Phase == scheduling.PodGroupInqueue { + attr.inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources)) + } } // Record metrics @@ -242,7 +249,11 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { minReq := api.NewResource(*job.PodGroup.Spec.MinResources) // The queue resource quota limit has not reached - return minReq.Add(attr.allocated).Add(ssn.InqueueJobResource[job.Queue]).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) + inqueue := minReq.Add(attr.allocated).Add(attr.inqueue).LessEqual(api.NewResource(queue.Queue.Spec.Capability)) + if inqueue { + attr.inqueue.Add(api.NewResource(*job.PodGroup.Spec.MinResources)) + } + return inqueue }) // Register event handlers.