Skip to content
This repository has been archived by the owner on Jul 7, 2019. It is now read-only.

Ignore nodes if out of syc. #26

Merged
merged 2 commits into from
Jun 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 72 additions & 16 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package api
import (
"fmt"

"github.com/golang/glog"

v1 "k8s.io/api/core/v1"
)

Expand All @@ -27,6 +29,9 @@ type NodeInfo struct {
Name string
Node *v1.Node

// The state of node
State NodeState

// The releasing resource on that node
Releasing *Resource
// The idle resource on that node
Expand All @@ -44,10 +49,18 @@ type NodeInfo struct {
Other interface{}
}

// NodeState defines the current state of node.
type NodeState struct {
Phase NodePhase
Reason string
}

// NewNodeInfo is used to create new nodeInfo object
func NewNodeInfo(node *v1.Node) *NodeInfo {
var ni *NodeInfo

if node == nil {
return &NodeInfo{
ni = &NodeInfo{
Releasing: EmptyResource(),
Idle: EmptyResource(),
Used: EmptyResource(),
Expand All @@ -57,21 +70,25 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {

Tasks: make(map[TaskID]*TaskInfo),
}
}

return &NodeInfo{
Name: node.Name,
Node: node,
} else {
ni = &NodeInfo{
Name: node.Name,
Node: node,

Releasing: EmptyResource(),
Idle: NewResource(node.Status.Allocatable),
Used: EmptyResource(),
Releasing: EmptyResource(),
Idle: NewResource(node.Status.Allocatable),
Used: EmptyResource(),

Allocatable: NewResource(node.Status.Allocatable),
Capability: NewResource(node.Status.Capacity),
Allocatable: NewResource(node.Status.Allocatable),
Capability: NewResource(node.Status.Capacity),

Tasks: make(map[TaskID]*TaskInfo),
Tasks: make(map[TaskID]*TaskInfo),
}
}

ni.setNodeState(node)

return ni
}

// Clone used to clone nodeInfo Object
Expand All @@ -85,8 +102,47 @@ func (ni *NodeInfo) Clone() *NodeInfo {
return res
}

// Ready returns whether node is ready for scheduling
func (ni *NodeInfo) Ready() bool {
return ni.State.Phase == Ready
}

func (ni *NodeInfo) setNodeState(node *v1.Node) {
// If node is nil, the node is un-initialized in cache
if node == nil {
ni.State = NodeState{
Phase: NotReady,
Reason: "UnInitialized",
}
return
}

// set NodeState according to resources
if !ni.Used.LessEqual(NewResource(node.Status.Allocatable)) {
ni.State = NodeState{
Phase: NotReady,
Reason: "OutOfSync",
}
return
}

// Node is ready (ignore node conditions because of taint/toleration)
ni.State = NodeState{
Phase: Ready,
Reason: "",
}
}

// SetNode sets kubernetes node object to nodeInfo object
func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.setNodeState(node)

if !ni.Ready() {
glog.Warningf("Failed to set node info, phase: %s, reason: %s",
ni.State.Phase, ni.State.Reason)
return
}

ni.Name = node.Name
ni.Node = node

Expand Down Expand Up @@ -176,16 +232,16 @@ func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error {

// String returns nodeInfo details in string format
func (ni NodeInfo) String() string {
res := ""
tasks := ""

i := 0
for _, task := range ni.Tasks {
res = res + fmt.Sprintf("\n\t %d: %v", i, task)
tasks = tasks + fmt.Sprintf("\n\t %d: %v", i, task)
i++
}

return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>, taints <%v>%s",
ni.Name, ni.Idle, ni.Used, ni.Releasing, ni.Node.Spec.Taints, res)
return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>, state <phase %s, reaseon %s>, taints <%v>%s",
ni.Name, ni.Idle, ni.Used, ni.Releasing, ni.State.Phase, ni.State.Reason, ni.Node.Spec.Taints, tasks)

}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
Releasing: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p2": NewTaskInfo(case01Pod2),
Expand Down Expand Up @@ -106,6 +107,7 @@ func TestNodeInfo_RemovePod(t *testing.T) {
Releasing: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p3": NewTaskInfo(case01Pod3),
Expand Down
21 changes: 21 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ func (ts TaskStatus) String() string {
}
}

// NodePhase defines the phase of node
type NodePhase int

const (
// Ready means the node is ready for scheduling
Ready NodePhase = 1 << iota
// NotReady means the node is not ready for scheduling
NotReady
)

func (np NodePhase) String() string {
switch np {
case Ready:
return "Ready"
case NotReady:
return "NotReady"
}

return "Unknown"
}

// validateStatusUpdate validates whether the status transfer is valid.
func validateStatusUpdate(oldStatus, newStatus TaskStatus) error {
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
}

for _, value := range sc.Nodes {
if !value.Ready() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we always track unready nodes? I am thinking if scheduler should delete node over timeout?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point ! We should delete node if there's no pod on it when timeout :) I'm going to open a separate PR for that.

continue
}

snapshot.Nodes[value.Name] = value.Clone()
}

Expand Down