From dcb0cefe376894486c12947d28fed7bc52387653 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Mon, 25 Feb 2019 17:38:18 +0800 Subject: [PATCH] Fixed flaky test. Signed-off-by: Da K. Ma --- pkg/scheduler/actions/allocate/allocate.go | 2 +- pkg/scheduler/actions/preempt/preempt.go | 6 +- pkg/scheduler/actions/reclaim/reclaim.go | 4 +- pkg/scheduler/api/cluster_info.go | 10 +-- pkg/scheduler/cache/cache.go | 33 +++++----- pkg/scheduler/framework/session.go | 63 +++++++------------ pkg/scheduler/framework/statement.go | 16 ++--- pkg/scheduler/plugins/gang/gang.go | 2 +- .../plugins/predicates/predicates.go | 2 +- .../plugins/prioritize/prioritize.go | 4 +- .../plugins/proportion/proportion.go | 16 +++-- 11 files changed, 68 insertions(+), 90 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 8c6ec8953..3c87e4d51 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -50,7 +50,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) } - if queue, found := ssn.QueueIndex[job.Queue]; found { + if queue, found := ssn.Queues[job.Queue]; found { queues.Push(queue) } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index b5d7cf9e3..ae86aba68 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -51,7 +51,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { queues := map[api.QueueID]*api.QueueInfo{} for _, job := range ssn.Jobs { - if queue, found := ssn.QueueIndex[job.Queue]; !found { + if queue, found := ssn.Queues[job.Queue]; !found { continue } else if _, existed := queues[queue.UID]; !existed { glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>", @@ -103,7 +103,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { return false } - job, found := ssn.JobIndex[task.Job] + job, found := ssn.Jobs[task.Job] if !found { return false } @@ -171,7 +171,7 @@ func preempt( ssn *framework.Session, stmt *framework.Statement, preemptor *api.TaskInfo, - nodes []*api.NodeInfo, + nodes map[string]*api.NodeInfo, filter func(*api.TaskInfo) bool, ) (bool, error) { resreq := preemptor.Resreq.Clone() diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 13c8c6011..827df02b7 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -53,7 +53,7 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { var underRequest []*api.JobInfo for _, job := range ssn.Jobs { - if queue, found := ssn.QueueIndex[job.Queue]; !found { + if queue, found := ssn.Queues[job.Queue]; !found { glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", job.Queue, job.Namespace, job.Name) continue @@ -130,7 +130,7 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { continue } - if j, found := ssn.JobIndex[task.Job]; !found { + if j, found := ssn.Jobs[task.Job]; !found { continue } else if j.Queue != job.Queue { // Clone task to avoid modify Task's status on node. diff --git a/pkg/scheduler/api/cluster_info.go b/pkg/scheduler/api/cluster_info.go index 8a3355e0c..8264f7a7c 100644 --- a/pkg/scheduler/api/cluster_info.go +++ b/pkg/scheduler/api/cluster_info.go @@ -19,13 +19,9 @@ import "fmt" // ClusterInfo is a snapshot of cluster by cache. type ClusterInfo struct { - Jobs []*JobInfo - - Nodes []*NodeInfo - - Queues []*QueueInfo - - Others []*TaskInfo + Jobs map[JobID]*JobInfo + Nodes map[string]*NodeInfo + Queues map[QueueID]*QueueInfo } func (ci ClusterInfo) String() string { diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 9b836347b..0f25c84b2 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -26,6 +26,8 @@ import ( "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" infov1 "k8s.io/client-go/informers/core/v1" @@ -43,12 +45,21 @@ import ( "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme" + kbschema "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme" kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions" kbinfov1 "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1" "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" kbapi "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" ) +func init() { + schemeBuilder := runtime.SchemeBuilder{ + v1.AddToScheme, + } + + utilruntime.Must(schemeBuilder.AddToScheme(kbschema.Scheme)) +} + // New returns a Cache implementation. func New(config *rest.Config, schedulerName string, nsAsQueue bool) Cache { return newSchedulerCache(config, schedulerName, nsAsQueue) @@ -507,20 +518,17 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { defer sc.Mutex.Unlock() snapshot := &kbapi.ClusterInfo{ - Nodes: make([]*kbapi.NodeInfo, 0, len(sc.Nodes)), - Jobs: make([]*kbapi.JobInfo, 0, len(sc.Jobs)), - Queues: make([]*kbapi.QueueInfo, 0, len(sc.Queues)), - Others: make([]*kbapi.TaskInfo, 0, 10), + Nodes: make(map[string]*kbapi.NodeInfo), + Jobs: make(map[kbapi.JobID]*kbapi.JobInfo), + Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo), } for _, value := range sc.Nodes { - snapshot.Nodes = append(snapshot.Nodes, value.Clone()) + snapshot.Nodes[value.Name] = value.Clone() } - queues := map[kbapi.QueueID]struct{}{} for _, value := range sc.Queues { - snapshot.Queues = append(snapshot.Queues, value.Clone()) - queues[value.UID] = struct{}{} + snapshot.Queues[value.UID] = value.Clone() } for _, value := range sc.Jobs { @@ -529,21 +537,16 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo { glog.V(4).Infof("The scheduling spec of Job <%v:%s/%s> is nil, ignore it.", value.UID, value.Namespace, value.Name) - // Also tracing the running task assigned by other scheduler. - for _, task := range value.TaskStatusIndex[kbapi.Running] { - snapshot.Others = append(snapshot.Others, task.Clone()) - } - continue } - if _, found := queues[value.Queue]; !found { + if _, found := snapshot.Queues[value.Queue]; !found { glog.V(3).Infof("The Queue <%v> of Job <%v> does not exist, ignore it.", value.Queue, value.UID) continue } - snapshot.Jobs = append(snapshot.Jobs, value.Clone()) + snapshot.Jobs[value.UID] = value.Clone() } glog.V(3).Infof("There are <%d> Jobs and <%d> Queues in total for scheduling.", diff --git a/pkg/scheduler/framework/session.go b/pkg/scheduler/framework/session.go index eb37db55b..df63c8795 100644 --- a/pkg/scheduler/framework/session.go +++ b/pkg/scheduler/framework/session.go @@ -37,15 +37,11 @@ type Session struct { cache cache.Cache - Jobs []*api.JobInfo - JobIndex map[api.JobID]*api.JobInfo - Nodes []*api.NodeInfo - NodeIndex map[string]*api.NodeInfo - Queues []*api.QueueInfo - QueueIndex map[api.QueueID]*api.QueueInfo - Others []*api.TaskInfo - Backlog []*api.JobInfo - Tiers []conf.Tier + Jobs map[api.JobID]*api.JobInfo + Nodes map[string]*api.NodeInfo + Queues map[api.QueueID]*api.QueueInfo + Backlog []*api.JobInfo + Tiers []conf.Tier plugins map[string]Plugin eventHandlers []*EventHandler @@ -63,11 +59,12 @@ type Session struct { func openSession(cache cache.Cache) *Session { ssn := &Session{ - UID: uuid.NewUUID(), - cache: cache, - JobIndex: map[api.JobID]*api.JobInfo{}, - NodeIndex: map[string]*api.NodeInfo{}, - QueueIndex: map[api.QueueID]*api.QueueInfo{}, + UID: uuid.NewUUID(), + cache: cache, + + Jobs: map[api.JobID]*api.JobInfo{}, + Nodes: map[string]*api.NodeInfo{}, + Queues: map[api.QueueID]*api.QueueInfo{}, plugins: map[string]Plugin{}, jobOrderFns: map[string]api.CompareFn{}, @@ -84,7 +81,8 @@ func openSession(cache cache.Cache) *Session { snapshot := cache.Snapshot() - for _, job := range snapshot.Jobs { + ssn.Jobs = snapshot.Jobs + for _, job := range ssn.Jobs { if vjr := ssn.JobValid(job); vjr != nil { if !vjr.Pass { jc := &v1alpha1.PodGroupCondition{ @@ -101,27 +99,12 @@ func openSession(cache cache.Cache) *Session { } } - continue + delete(ssn.Jobs, job.UID) } - - ssn.Jobs = append(ssn.Jobs, job) - } - - for _, job := range ssn.Jobs { - ssn.JobIndex[job.UID] = job } ssn.Nodes = snapshot.Nodes - for _, node := range ssn.Nodes { - ssn.NodeIndex[node.Name] = node - } - ssn.Queues = snapshot.Queues - for _, queue := range ssn.Queues { - ssn.QueueIndex[queue.UID] = queue - } - - ssn.Others = snapshot.Others glog.V(3).Infof("Open Session %v with <%d> Job and <%d> Queues", ssn.UID, len(ssn.Jobs), len(ssn.Queues)) @@ -146,9 +129,7 @@ func closeSession(ssn *Session) { } ssn.Jobs = nil - ssn.JobIndex = nil ssn.Nodes = nil - ssn.NodeIndex = nil ssn.Backlog = nil ssn.plugins = nil ssn.eventHandlers = nil @@ -206,7 +187,7 @@ func (ssn *Session) Statement() *Statement { func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { // Only update status in session - job, found := ssn.JobIndex[task.Job] + job, found := ssn.Jobs[task.Job] if found { if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", @@ -219,7 +200,7 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error { task.NodeName = hostname - if node, found := ssn.NodeIndex[hostname]; found { + if node, found := ssn.Nodes[hostname]; found { if err := node.AddTask(task); err != nil { glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v", task.Namespace, task.Name, hostname, ssn.UID, err) @@ -248,7 +229,7 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { } // Only update status in session - job, found := ssn.JobIndex[task.Job] + job, found := ssn.Jobs[task.Job] if found { if err := job.UpdateTaskStatus(task, api.Allocated); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", @@ -261,7 +242,7 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error { task.NodeName = hostname - if node, found := ssn.NodeIndex[hostname]; found { + if node, found := ssn.Nodes[hostname]; found { if err := node.AddTask(task); err != nil { glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v", task.Namespace, task.Name, hostname, ssn.UID, err) @@ -304,7 +285,7 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error { } // Update status in session - if job, found := ssn.JobIndex[task.Job]; found { + if job, found := ssn.Jobs[task.Job]; found { if err := job.UpdateTaskStatus(task, api.Binding); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", task.Namespace, task.Name, api.Binding, ssn.UID, err) @@ -323,7 +304,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { } // Update status in session - job, found := ssn.JobIndex[reclaimee.Job] + job, found := ssn.Jobs[reclaimee.Job] if found { if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", @@ -335,7 +316,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { } // Update task in node. - if node, found := ssn.NodeIndex[reclaimee.NodeName]; found { + if node, found := ssn.Nodes[reclaimee.NodeName]; found { if err := node.UpdateTask(reclaimee); err != nil { glog.Errorf("Failed to update task <%v/%v> in Session <%v>: %v", reclaimee.Namespace, reclaimee.Name, ssn.UID, err) @@ -355,7 +336,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error { // UpdateJobStatus update job condition accordingly. func (ssn *Session) UpdateJobCondition(jobInfo *api.JobInfo, cond *v1alpha1.PodGroupCondition) error { - job, ok := ssn.JobIndex[jobInfo.UID] + job, ok := ssn.Jobs[jobInfo.UID] if !ok { return fmt.Errorf("failed to find job <%s/%s>", jobInfo.Namespace, jobInfo.Name) } diff --git a/pkg/scheduler/framework/statement.go b/pkg/scheduler/framework/statement.go index 5435a2d42..9ed6ea1fd 100644 --- a/pkg/scheduler/framework/statement.go +++ b/pkg/scheduler/framework/statement.go @@ -34,7 +34,7 @@ type operation struct { func (s *Statement) Evict(reclaimee *api.TaskInfo, reason string) error { // Update status in session - job, found := s.ssn.JobIndex[reclaimee.Job] + job, found := s.ssn.Jobs[reclaimee.Job] if found { if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", @@ -46,7 +46,7 @@ func (s *Statement) Evict(reclaimee *api.TaskInfo, reason string) error { } // Update task in node. - if node, found := s.ssn.NodeIndex[reclaimee.NodeName]; found { + if node, found := s.ssn.Nodes[reclaimee.NodeName]; found { node.UpdateTask(reclaimee) } @@ -80,7 +80,7 @@ func (s *Statement) evict(reclaimee *api.TaskInfo, reason string) error { func (s *Statement) unevict(reclaimee *api.TaskInfo, reason string) error { // Update status in session - job, found := s.ssn.JobIndex[reclaimee.Job] + job, found := s.ssn.Jobs[reclaimee.Job] if found { if err := job.UpdateTaskStatus(reclaimee, api.Running); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", @@ -92,7 +92,7 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo, reason string) error { } // Update task in node. - if node, found := s.ssn.NodeIndex[reclaimee.NodeName]; found { + if node, found := s.ssn.Nodes[reclaimee.NodeName]; found { node.AddTask(reclaimee) } @@ -109,7 +109,7 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo, reason string) error { func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error { // Only update status in session - job, found := s.ssn.JobIndex[task.Job] + job, found := s.ssn.Jobs[task.Job] if found { if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", @@ -122,7 +122,7 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error { task.NodeName = hostname - if node, found := s.ssn.NodeIndex[hostname]; found { + if node, found := s.ssn.Nodes[hostname]; found { if err := node.AddTask(task); err != nil { glog.Errorf("Failed to pipeline task <%v/%v> to node <%v> in Session <%v>: %v", task.Namespace, task.Name, hostname, s.ssn.UID, err) @@ -155,7 +155,7 @@ func (s *Statement) pipeline(task *api.TaskInfo) { func (s *Statement) unpipeline(task *api.TaskInfo) error { // Only update status in session - job, found := s.ssn.JobIndex[task.Job] + job, found := s.ssn.Jobs[task.Job] if found { if err := job.UpdateTaskStatus(task, api.Pending); err != nil { glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v", @@ -168,7 +168,7 @@ func (s *Statement) unpipeline(task *api.TaskInfo) error { hostname := task.NodeName - if node, found := s.ssn.NodeIndex[hostname]; found { + if node, found := s.ssn.Nodes[hostname]; found { if err := node.RemoveTask(task); err != nil { glog.Errorf("Failed to pipeline task <%v/%v> to node <%v> in Session <%v>: %v", task.Namespace, task.Name, hostname, s.ssn.UID, err) diff --git a/pkg/scheduler/plugins/gang/gang.go b/pkg/scheduler/plugins/gang/gang.go index 314114fc5..ced2a9a64 100644 --- a/pkg/scheduler/plugins/gang/gang.go +++ b/pkg/scheduler/plugins/gang/gang.go @@ -105,7 +105,7 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) { var victims []*api.TaskInfo for _, preemptee := range preemptees { - job := ssn.JobIndex[preemptee.Job] + job := ssn.Jobs[preemptee.Job] occupid := readyTaskNum(job) preemptable := job.MinAvailable <= occupid-1 diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index 5da6eeb4e..d19090423 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -93,7 +93,7 @@ type cachedNodeInfo struct { } func (c *cachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) { - node, found := c.session.NodeIndex[name] + node, found := c.session.Nodes[name] if !found { return nil, fmt.Errorf("failed to find node <%s>", name) } diff --git a/pkg/scheduler/plugins/prioritize/prioritize.go b/pkg/scheduler/plugins/prioritize/prioritize.go index 8b797139e..c5f3cf268 100644 --- a/pkg/scheduler/plugins/prioritize/prioritize.go +++ b/pkg/scheduler/plugins/prioritize/prioritize.go @@ -44,7 +44,7 @@ func getInterPodAffinityScore(name string, interPodAffinityScore schedulerapi.Ho return 0 } -func generateNodeMapAndSlice(nodes []*api.NodeInfo) (map[string]*cache.NodeInfo, []*v1.Node) { +func generateNodeMapAndSlice(nodes map[string]*api.NodeInfo) (map[string]*cache.NodeInfo, []*v1.Node) { var nodeMap map[string]*cache.NodeInfo var nodeSlice []*v1.Node nodeMap = make(map[string]*cache.NodeInfo) @@ -62,7 +62,7 @@ type cachedNodeInfo struct { } func (c *cachedNodeInfo) GetNodeInfo(name string) (*v1.Node, error) { - node, found := c.session.NodeIndex[name] + node, found := c.session.Nodes[name] if !found { for _, cacheNode := range c.session.Nodes { pods := cacheNode.Pods() diff --git a/pkg/scheduler/plugins/proportion/proportion.go b/pkg/scheduler/plugins/proportion/proportion.go index 4a0c5a6c1..8ee1be899 100644 --- a/pkg/scheduler/plugins/proportion/proportion.go +++ b/pkg/scheduler/plugins/proportion/proportion.go @@ -57,11 +57,6 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { pp.totalResource.Add(n.Allocatable) } - // Also remove the resource used by other scheduler. - for _, task := range ssn.Others { - pp.totalResource.Sub(task.Resreq) - } - glog.V(4).Infof("The total resource is <%v>", pp.totalResource) // Build attributes for Queues. @@ -69,7 +64,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { glog.V(4).Infof("Considering Job <%s/%s>.", job.Namespace, job.Name) if _, found := pp.queueOpts[job.Queue]; !found { - queue := ssn.QueueIndex[job.Queue] + queue := ssn.Queues[job.Queue] attr := &queueAttr{ queueID: queue.UID, name: queue.Name, @@ -163,7 +158,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { allocations := map[api.QueueID]*api.Resource{} for _, reclaimee := range reclaimees { - job := ssn.JobIndex[reclaimee.Job] + job := ssn.Jobs[reclaimee.Job] attr := pp.queueOpts[job.Queue] if _, found := allocations[job.Queue]; !found { @@ -189,13 +184,16 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { queue := obj.(*api.QueueInfo) attr := pp.queueOpts[queue.UID] + glog.V(4).Infof("Queue <%v> is deserved <%v>, allocated <%v>", + queue.Name, attr.deserved, attr.allocated) + return attr.deserved.LessEqual(attr.allocated) }) // Register event handlers. ssn.AddEventHandler(&framework.EventHandler{ AllocateFunc: func(event *framework.Event) { - job := ssn.JobIndex[event.Task.Job] + job := ssn.Jobs[event.Task.Job] attr := pp.queueOpts[job.Queue] attr.allocated.Add(event.Task.Resreq) @@ -205,7 +203,7 @@ func (pp *proportionPlugin) OnSessionOpen(ssn *framework.Session) { event.Task.Namespace, event.Task.Name, event.Task.Resreq, attr.share) }, DeallocateFunc: func(event *framework.Event) { - job := ssn.JobIndex[event.Task.Job] + job := ssn.Jobs[event.Task.Job] attr := pp.queueOpts[job.Queue] attr.allocated.Sub(event.Task.Resreq)