Skip to content

Commit

Permalink
Merge pull request kubernetes-retired#468 from adam-marek/detailed-re…
Browse files Browse the repository at this point in the history
…source-info

Detailed 'unschedulable' events
  • Loading branch information
k8s-ci-robot authored Nov 16, 2018
2 parents 74b3a0a + 0f8bd08 commit c9f8190
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 17 deletions.
11 changes: 11 additions & 0 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
len(ssn.Nodes), job.Namespace, job.Name)

//any task that doesn't fit will be the last processed
//within this loop context so any existing contents of
//NodesFitDelta are for tasks that eventually did fit on a
//node
if len(job.NodesFitDelta) > 0 {
job.NodesFitDelta = make(api.NodeResourceMap)
}
for _, node := range ssn.Nodes {
glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
Expand All @@ -124,6 +131,10 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
}
assigned = true
break
} else {
//store information about missing resources
job.NodesFitDelta[node.Name] = node.Idle.Clone()
job.NodesFitDelta[node.Name].FitDelta(task.Resreq)
}

// Allocate releasing resource to the task if any.
Expand Down
63 changes: 51 additions & 12 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/kubernetes-sigs/kube-batch/cmd/kube-batch/app/options"
arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
"github.com/kubernetes-sigs/kube-batch/pkg/apis/utils"
"sort"
"strings"
)

type TaskID types.UID
Expand Down Expand Up @@ -73,9 +75,8 @@ func NewTaskInfo(pod *v1.Pod) *TaskInfo {
NodeName: pod.Spec.NodeName,
Status: getTaskStatus(pod),
Priority: 1,

Pod: pod,
Resreq: req,
Pod: pod,
Resreq: req,
}

if pod.Spec.Priority != nil {
Expand Down Expand Up @@ -109,6 +110,8 @@ type JobID types.UID

type tasksMap map[TaskID]*TaskInfo

type NodeResourceMap map[string]*Resource

type JobInfo struct {
UID JobID

Expand All @@ -122,6 +125,8 @@ type JobInfo struct {
NodeSelector map[string]string
MinAvailable int32

NodesFitDelta NodeResourceMap

// All tasks of the Job.
TaskStatusIndex map[TaskStatus]tasksMap
Tasks tasksMap
Expand All @@ -140,11 +145,11 @@ func NewJobInfo(uid JobID) *JobInfo {
return &JobInfo{
UID: uid,

MinAvailable: 0,
NodeSelector: make(map[string]string),

Allocated: EmptyResource(),
TotalRequest: EmptyResource(),
MinAvailable: 0,
NodeSelector: make(map[string]string),
NodesFitDelta: make(NodeResourceMap),
Allocated: EmptyResource(),
TotalRequest: EmptyResource(),

TaskStatusIndex: map[TaskStatus]tasksMap{},
Tasks: tasksMap{},
Expand Down Expand Up @@ -278,10 +283,11 @@ func (ji *JobInfo) Clone() *JobInfo {
Namespace: ji.Namespace,
Queue: ji.Queue,

MinAvailable: ji.MinAvailable,
NodeSelector: map[string]string{},
Allocated: ji.Allocated.Clone(),
TotalRequest: ji.TotalRequest.Clone(),
MinAvailable: ji.MinAvailable,
NodeSelector: map[string]string{},
Allocated: ji.Allocated.Clone(),
TotalRequest: ji.TotalRequest.Clone(),
NodesFitDelta: make(NodeResourceMap),

PDB: ji.PDB,
PodGroup: ji.PodGroup,
Expand Down Expand Up @@ -314,3 +320,36 @@ func (ji JobInfo) String() string {

return fmt.Sprintf("Job (%v): name %v, minAvailable %d", ji.UID, ji.Name, ji.MinAvailable) + res
}

// Error returns detailed information on why a job's task failed to fit on
// each available node
func (f *JobInfo) FitError() string {
if len(f.NodesFitDelta) == 0 {
reasonMsg := fmt.Sprintf("0 nodes are available")
return reasonMsg
}

reasons := make(map[string]int)
for _, v := range f.NodesFitDelta {
if v.Get(v1.ResourceCPU) < 0 {
reasons["cpu"]++
}
if v.Get(v1.ResourceMemory) < 0 {
reasons["memory"]++
}
if v.Get(GPUResourceName) < 0 {
reasons["GPU"]++
}
}

sortReasonsHistogram := func() []string {
reasonStrings := []string{}
for k, v := range reasons {
reasonStrings = append(reasonStrings, fmt.Sprintf("%v insufficient %v", v, k))
}
sort.Strings(reasonStrings)
return reasonStrings
}
reasonMsg := fmt.Sprintf("0/%v nodes are available, %v.", len(f.NodesFitDelta), strings.Join(sortReasonsHistogram(), ", "))
return reasonMsg
}
9 changes: 6 additions & 3 deletions pkg/scheduler/api/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ func TestAddTaskInfo(t *testing.T) {
case01_task4.UID: case01_task4,
},
},
NodeSelector: make(map[string]string),
NodeSelector: make(map[string]string),
NodesFitDelta: make(NodeResourceMap),
},
},
}
Expand Down Expand Up @@ -143,7 +144,8 @@ func TestDeleteTaskInfo(t *testing.T) {
Pending: {case01_task1.UID: case01_task1},
Running: {case01_task3.UID: case01_task3},
},
NodeSelector: make(map[string]string),
NodeSelector: make(map[string]string),
NodesFitDelta: make(NodeResourceMap),
},
},
{
Expand All @@ -168,7 +170,8 @@ func TestDeleteTaskInfo(t *testing.T) {
case02_task3.UID: case02_task3,
},
},
NodeSelector: make(map[string]string),
NodeSelector: make(map[string]string),
NodesFitDelta: make(NodeResourceMap),
},
},
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,25 @@ func (r *Resource) Sub(rr *Resource) *Resource {
r, rr))
}

//Computes the delta between a resource oject representing available
//resources an operand representing resources being requested. Any
//field that is less than 0 after the operation represents an
//insufficient resource.
func (r *Resource) FitDelta(rr *Resource) *Resource {
if rr.MilliCPU > 0 {
r.MilliCPU -= rr.MilliCPU + minMilliCPU
}

if rr.Memory > 0 {
r.Memory -= rr.Memory + minMemory
}

if rr.MilliGPU > 0 {
r.MilliGPU -= rr.MilliGPU + minMilliGPU
}
return r
}

func (r *Resource) Multi(ratio float64) *Resource {
r.MilliCPU = r.MilliCPU * ratio
r.Memory = r.Memory * ratio
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,5 +550,11 @@ func (sc *SchedulerCache) Backoff(job *arbapi.JobInfo, event arbcorev1.Event, re
return fmt.Errorf("no scheduling specification for job")
}

for _, tasks := range job.TaskStatusIndex {
for _, t := range tasks {
sc.recorder.Eventf(t.Pod, v1.EventTypeWarning, string(event), reason)
}
}

return nil
}
7 changes: 5 additions & 2 deletions pkg/scheduler/plugins/gang/gang.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package gang

import (
"fmt"
"github.com/golang/glog"

arbcorev1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1"
Expand Down Expand Up @@ -144,8 +145,10 @@ func (gp *gangPlugin) OnSessionOpen(ssn *framework.Session) {

func (gp *gangPlugin) OnSessionClose(ssn *framework.Session) {
for _, job := range ssn.Jobs {
if len(job.TaskStatusIndex[api.Allocated]) != 0 {
ssn.Backoff(job, arbcorev1.UnschedulableEvent, "not enough resource for job")
if len(job.TaskStatusIndex[api.Pending]) != 0 {
glog.V(3).Infof("Gang: <%v/%v> allocated: %v, pending: %v", job.Namespace, job.Name, len(job.TaskStatusIndex[api.Allocated]), len(job.TaskStatusIndex[api.Pending]))
msg := fmt.Sprintf("%v/%v tasks in gang unschedulable: %v", len(job.TaskStatusIndex[api.Pending]), len(job.Tasks), job.FitError())
ssn.Backoff(job, arbcorev1.UnschedulableEvent, msg)
}
}
}

0 comments on commit c9f8190

Please sign in to comment.