Skip to content
This repository has been archived by the owner on May 25, 2023. It is now read-only.

Commit

Permalink
Merge pull request #607 from k82cn/kb_606
Browse files Browse the repository at this point in the history
Fixed flaky test.
  • Loading branch information
k8s-ci-robot authored Feb 28, 2019
2 parents e1204f3 + 4635fab commit 88f75d4
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 90 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}

if queue, found := ssn.QueueIndex[job.Queue]; found {
if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
queues := map[api.QueueID]*api.QueueInfo{}

for _, job := range ssn.Jobs {
if queue, found := ssn.QueueIndex[job.Queue]; !found {
if queue, found := ssn.Queues[job.Queue]; !found {
continue
} else if _, existed := queues[queue.UID]; !existed {
glog.V(3).Infof("Added Queue <%s> for Job <%s/%s>",
Expand Down Expand Up @@ -103,7 +103,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
return false
}

job, found := ssn.JobIndex[task.Job]
job, found := ssn.Jobs[task.Job]
if !found {
return false
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func preempt(
ssn *framework.Session,
stmt *framework.Statement,
preemptor *api.TaskInfo,
nodes []*api.NodeInfo,
nodes map[string]*api.NodeInfo,
filter func(*api.TaskInfo) bool,
) (bool, error) {
resreq := preemptor.Resreq.Clone()
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {

var underRequest []*api.JobInfo
for _, job := range ssn.Jobs {
if queue, found := ssn.QueueIndex[job.Queue]; !found {
if queue, found := ssn.Queues[job.Queue]; !found {
glog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
Expand Down Expand Up @@ -130,7 +130,7 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
continue
}

if j, found := ssn.JobIndex[task.Job]; !found {
if j, found := ssn.Jobs[task.Job]; !found {
continue
} else if j.Queue != job.Queue {
// Clone task to avoid modify Task's status on node.
Expand Down
10 changes: 3 additions & 7 deletions pkg/scheduler/api/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@ import "fmt"

// ClusterInfo is a snapshot of cluster by cache.
type ClusterInfo struct {
Jobs []*JobInfo

Nodes []*NodeInfo

Queues []*QueueInfo

Others []*TaskInfo
Jobs map[JobID]*JobInfo
Nodes map[string]*NodeInfo
Queues map[QueueID]*QueueInfo
}

func (ci ClusterInfo) String() string {
Expand Down
33 changes: 18 additions & 15 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
infov1 "k8s.io/client-go/informers/core/v1"
Expand All @@ -43,12 +45,21 @@ import (
"github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned"
"github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme"
kbschema "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned/scheme"
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions"
kbinfov1 "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
kbapi "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
)

func init() {
schemeBuilder := runtime.SchemeBuilder{
v1.AddToScheme,
}

utilruntime.Must(schemeBuilder.AddToScheme(kbschema.Scheme))
}

// New returns a Cache implementation.
func New(config *rest.Config, schedulerName string, nsAsQueue bool) Cache {
return newSchedulerCache(config, schedulerName, nsAsQueue)
Expand Down Expand Up @@ -507,20 +518,17 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
defer sc.Mutex.Unlock()

snapshot := &kbapi.ClusterInfo{
Nodes: make([]*kbapi.NodeInfo, 0, len(sc.Nodes)),
Jobs: make([]*kbapi.JobInfo, 0, len(sc.Jobs)),
Queues: make([]*kbapi.QueueInfo, 0, len(sc.Queues)),
Others: make([]*kbapi.TaskInfo, 0, 10),
Nodes: make(map[string]*kbapi.NodeInfo),
Jobs: make(map[kbapi.JobID]*kbapi.JobInfo),
Queues: make(map[kbapi.QueueID]*kbapi.QueueInfo),
}

for _, value := range sc.Nodes {
snapshot.Nodes = append(snapshot.Nodes, value.Clone())
snapshot.Nodes[value.Name] = value.Clone()
}

queues := map[kbapi.QueueID]struct{}{}
for _, value := range sc.Queues {
snapshot.Queues = append(snapshot.Queues, value.Clone())
queues[value.UID] = struct{}{}
snapshot.Queues[value.UID] = value.Clone()
}

for _, value := range sc.Jobs {
Expand All @@ -529,21 +537,16 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
glog.V(4).Infof("The scheduling spec of Job <%v:%s/%s> is nil, ignore it.",
value.UID, value.Namespace, value.Name)

// Also tracing the running task assigned by other scheduler.
for _, task := range value.TaskStatusIndex[kbapi.Running] {
snapshot.Others = append(snapshot.Others, task.Clone())
}

continue
}

if _, found := queues[value.Queue]; !found {
if _, found := snapshot.Queues[value.Queue]; !found {
glog.V(3).Infof("The Queue <%v> of Job <%v> does not exist, ignore it.",
value.Queue, value.UID)
continue
}

snapshot.Jobs = append(snapshot.Jobs, value.Clone())
snapshot.Jobs[value.UID] = value.Clone()
}

glog.V(3).Infof("There are <%d> Jobs and <%d> Queues in total for scheduling.",
Expand Down
63 changes: 22 additions & 41 deletions pkg/scheduler/framework/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,11 @@ type Session struct {

cache cache.Cache

Jobs []*api.JobInfo
JobIndex map[api.JobID]*api.JobInfo
Nodes []*api.NodeInfo
NodeIndex map[string]*api.NodeInfo
Queues []*api.QueueInfo
QueueIndex map[api.QueueID]*api.QueueInfo
Others []*api.TaskInfo
Backlog []*api.JobInfo
Tiers []conf.Tier
Jobs map[api.JobID]*api.JobInfo
Nodes map[string]*api.NodeInfo
Queues map[api.QueueID]*api.QueueInfo
Backlog []*api.JobInfo
Tiers []conf.Tier

plugins map[string]Plugin
eventHandlers []*EventHandler
Expand All @@ -63,11 +59,12 @@ type Session struct {

func openSession(cache cache.Cache) *Session {
ssn := &Session{
UID: uuid.NewUUID(),
cache: cache,
JobIndex: map[api.JobID]*api.JobInfo{},
NodeIndex: map[string]*api.NodeInfo{},
QueueIndex: map[api.QueueID]*api.QueueInfo{},
UID: uuid.NewUUID(),
cache: cache,

Jobs: map[api.JobID]*api.JobInfo{},
Nodes: map[string]*api.NodeInfo{},
Queues: map[api.QueueID]*api.QueueInfo{},

plugins: map[string]Plugin{},
jobOrderFns: map[string]api.CompareFn{},
Expand All @@ -84,7 +81,8 @@ func openSession(cache cache.Cache) *Session {

snapshot := cache.Snapshot()

for _, job := range snapshot.Jobs {
ssn.Jobs = snapshot.Jobs
for _, job := range ssn.Jobs {
if vjr := ssn.JobValid(job); vjr != nil {
if !vjr.Pass {
jc := &v1alpha1.PodGroupCondition{
Expand All @@ -101,27 +99,12 @@ func openSession(cache cache.Cache) *Session {
}
}

continue
delete(ssn.Jobs, job.UID)
}

ssn.Jobs = append(ssn.Jobs, job)
}

for _, job := range ssn.Jobs {
ssn.JobIndex[job.UID] = job
}

ssn.Nodes = snapshot.Nodes
for _, node := range ssn.Nodes {
ssn.NodeIndex[node.Name] = node
}

ssn.Queues = snapshot.Queues
for _, queue := range ssn.Queues {
ssn.QueueIndex[queue.UID] = queue
}

ssn.Others = snapshot.Others

glog.V(3).Infof("Open Session %v with <%d> Job and <%d> Queues",
ssn.UID, len(ssn.Jobs), len(ssn.Queues))
Expand All @@ -146,9 +129,7 @@ func closeSession(ssn *Session) {
}

ssn.Jobs = nil
ssn.JobIndex = nil
ssn.Nodes = nil
ssn.NodeIndex = nil
ssn.Backlog = nil
ssn.plugins = nil
ssn.eventHandlers = nil
Expand Down Expand Up @@ -206,7 +187,7 @@ func (ssn *Session) Statement() *Statement {

func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error {
// Only update status in session
job, found := ssn.JobIndex[task.Job]
job, found := ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
Expand All @@ -219,7 +200,7 @@ func (ssn *Session) Pipeline(task *api.TaskInfo, hostname string) error {

task.NodeName = hostname

if node, found := ssn.NodeIndex[hostname]; found {
if node, found := ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, ssn.UID, err)
Expand Down Expand Up @@ -248,7 +229,7 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error {
}

// Only update status in session
job, found := ssn.JobIndex[task.Job]
job, found := ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
Expand All @@ -261,7 +242,7 @@ func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error {

task.NodeName = hostname

if node, found := ssn.NodeIndex[hostname]; found {
if node, found := ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, ssn.UID, err)
Expand Down Expand Up @@ -304,7 +285,7 @@ func (ssn *Session) dispatch(task *api.TaskInfo) error {
}

// Update status in session
if job, found := ssn.JobIndex[task.Job]; found {
if job, found := ssn.Jobs[task.Job]; found {
if err := job.UpdateTaskStatus(task, api.Binding); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Binding, ssn.UID, err)
Expand All @@ -323,7 +304,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error {
}

// Update status in session
job, found := ssn.JobIndex[reclaimee.Job]
job, found := ssn.Jobs[reclaimee.Job]
if found {
if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
Expand All @@ -335,7 +316,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error {
}

// Update task in node.
if node, found := ssn.NodeIndex[reclaimee.NodeName]; found {
if node, found := ssn.Nodes[reclaimee.NodeName]; found {
if err := node.UpdateTask(reclaimee); err != nil {
glog.Errorf("Failed to update task <%v/%v> in Session <%v>: %v",
reclaimee.Namespace, reclaimee.Name, ssn.UID, err)
Expand All @@ -355,7 +336,7 @@ func (ssn *Session) Evict(reclaimee *api.TaskInfo, reason string) error {

// UpdateJobStatus update job condition accordingly.
func (ssn *Session) UpdateJobCondition(jobInfo *api.JobInfo, cond *v1alpha1.PodGroupCondition) error {
job, ok := ssn.JobIndex[jobInfo.UID]
job, ok := ssn.Jobs[jobInfo.UID]
if !ok {
return fmt.Errorf("failed to find job <%s/%s>", jobInfo.Namespace, jobInfo.Name)
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type operation struct {

func (s *Statement) Evict(reclaimee *api.TaskInfo, reason string) error {
// Update status in session
job, found := s.ssn.JobIndex[reclaimee.Job]
job, found := s.ssn.Jobs[reclaimee.Job]
if found {
if err := job.UpdateTaskStatus(reclaimee, api.Releasing); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
Expand All @@ -46,7 +46,7 @@ func (s *Statement) Evict(reclaimee *api.TaskInfo, reason string) error {
}

// Update task in node.
if node, found := s.ssn.NodeIndex[reclaimee.NodeName]; found {
if node, found := s.ssn.Nodes[reclaimee.NodeName]; found {
node.UpdateTask(reclaimee)
}

Expand Down Expand Up @@ -80,7 +80,7 @@ func (s *Statement) evict(reclaimee *api.TaskInfo, reason string) error {

func (s *Statement) unevict(reclaimee *api.TaskInfo, reason string) error {
// Update status in session
job, found := s.ssn.JobIndex[reclaimee.Job]
job, found := s.ssn.Jobs[reclaimee.Job]
if found {
if err := job.UpdateTaskStatus(reclaimee, api.Running); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
Expand All @@ -92,7 +92,7 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo, reason string) error {
}

// Update task in node.
if node, found := s.ssn.NodeIndex[reclaimee.NodeName]; found {
if node, found := s.ssn.Nodes[reclaimee.NodeName]; found {
node.AddTask(reclaimee)
}

Expand All @@ -109,7 +109,7 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo, reason string) error {

func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
// Only update status in session
job, found := s.ssn.JobIndex[task.Job]
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
Expand All @@ -122,7 +122,7 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {

task.NodeName = hostname

if node, found := s.ssn.NodeIndex[hostname]; found {
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
glog.Errorf("Failed to pipeline task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
Expand Down Expand Up @@ -155,7 +155,7 @@ func (s *Statement) pipeline(task *api.TaskInfo) {

func (s *Statement) unpipeline(task *api.TaskInfo) error {
// Only update status in session
job, found := s.ssn.JobIndex[task.Job]
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
Expand All @@ -168,7 +168,7 @@ func (s *Statement) unpipeline(task *api.TaskInfo) error {

hostname := task.NodeName

if node, found := s.ssn.NodeIndex[hostname]; found {
if node, found := s.ssn.Nodes[hostname]; found {
if err := node.RemoveTask(task); err != nil {
glog.Errorf("Failed to pipeline task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {
var victims []*api.TaskInfo

for _, preemptee := range preemptees {
job := ssn.JobIndex[preemptee.Job]
job := ssn.Jobs[preemptee.Job]
occupid := readyTaskNum(job)
preemptable := job.MinAvailable <= occupid-1

Expand Down
Loading

0 comments on commit 88f75d4

Please sign in to comment.