Skip to content

Commit

Permalink
scheduler orchestrate candidates using resource preference
Browse files Browse the repository at this point in the history
  • Loading branch information
zc authored and CMGS committed Mar 19, 2020
1 parent fc62687 commit 9faf9ea
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 44 deletions.
22 changes: 13 additions & 9 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,19 @@ func getNodesInfo(nodes map[string]*types.Node, cpu float64, memory, storage int
InitVolumeMap: node.InitVolume,
MemCap: node.MemCap,
StorageCap: node.AvailableStorage(),
CPURate: cpu / float64(len(node.InitCPU)),
MemRate: float64(memory) / float64(node.InitMemCap),
StorageRate: float64(storage) / float64(node.InitStorageCap),
CPUUsed: node.CPUUsed / float64(len(node.InitCPU)),
MemUsage: 1.0 - float64(node.MemCap)/float64(node.InitMemCap),
StorageUsage: node.StorageUsage(),
Capacity: 0,
Count: 0,
Deploy: 0,
Rates: map[types.ResourceType]float64{
types.ResourceCPU: cpu / float64(len(node.InitCPU)),
types.ResourceMemory: float64(memory) / float64(node.InitMemCap),
types.ResourceStorage: float64(storage) / float64(node.InitStorageCap),
},
Usages: map[types.ResourceType]float64{
types.ResourceCPU: node.CPUUsed / float64(len(node.InitCPU)),
types.ResourceMemory: 1.0 - float64(node.MemCap)/float64(node.InitMemCap),
types.ResourceStorage: node.StorageUsage(),
},
Capacity: 0,
Count: 0,
Deploy: 0,
}
result = append(result, nodeInfo)
}
Expand Down
23 changes: 12 additions & 11 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,24 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions

total = utils.Min(volumeTotal, storTotal, total)

volumeSchedule := false
for _, volume := range opts.Volumes {
if volume.RequireSchedule() {
volumeSchedule = true
break
}
}
resourceType := types.GetGlobalResource(opts.CPUBind, volumeSchedule)

switch opts.DeployMethod {
case cluster.DeployAuto:
nodesInfo, err = c.scheduler.CommonDivision(nodesInfo, opts.Count, total)
case cluster.DeployEach:
nodesInfo, err = c.scheduler.EachDivision(nodesInfo, opts.Count, opts.NodesLimit)
nodesInfo, err = c.scheduler.EachDivision(nodesInfo, opts.Count, opts.NodesLimit, resourceType)
case cluster.DeployFill:
nodesInfo, err = c.scheduler.FillDivision(nodesInfo, opts.Count, opts.NodesLimit)
nodesInfo, err = c.scheduler.FillDivision(nodesInfo, opts.Count, opts.NodesLimit, resourceType)
case cluster.DeployGlobal:
volumeSchedule := false
for _, volume := range opts.Volumes {
if volume.RequireSchedule() {
volumeSchedule = true
break
}
}
globalResource := types.GetGlobalResource(opts.CPUBind, volumeSchedule)
nodesInfo, err = c.scheduler.GlobalDivision(nodesInfo, opts.Count, total, globalResource)
nodesInfo, err = c.scheduler.GlobalDivision(nodesInfo, opts.Count, total, resourceType)
default:
return types.ErrBadDeployMethod
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/complex/average.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// AveragePlan deploy container each node
func AveragePlan(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error) {
func AveragePlan(nodesInfo []types.NodeInfo, need, limit int, resourceType types.ResourceType) ([]types.NodeInfo, error) {
log.Debugf("[AveragePlan] need %d limit %d", need, limit)
nodesInfoLength := len(nodesInfo)
if nodesInfoLength < limit {
Expand All @@ -21,7 +21,7 @@ func AveragePlan(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo,
if p == nodesInfoLength {
return nil, types.ErrInsufficientCap
}
nodesInfo = scoreSort(nodesInfo[p:])
nodesInfo = scoreSort(nodesInfo[p:], resourceType)
if limit > 0 {
nodesInfo = nodesInfo[:limit]
}
Expand Down
4 changes: 2 additions & 2 deletions scheduler/complex/fill.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// FillPlan deploy container each node
func FillPlan(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error) {
func FillPlan(nodesInfo []types.NodeInfo, need, limit int, resourceType types.ResourceType) ([]types.NodeInfo, error) {
log.Debugf("[FillPlan] need %d limit %d", need, limit)
nodesInfoLength := len(nodesInfo)
if nodesInfoLength < limit {
Expand All @@ -21,7 +21,7 @@ func FillPlan(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, er
if p == nodesInfoLength {
return nil, types.ErrAlreadyFilled
}
nodesInfo = scoreSort(nodesInfo[p:])
nodesInfo = scoreSort(nodesInfo[p:], resourceType)
if limit > 0 && len(nodesInfo) > limit {
nodesInfo = nodesInfo[:limit]
}
Expand Down
8 changes: 4 additions & 4 deletions scheduler/complex/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
)

// GlobalDivisionPlan 基于全局资源配额
func GlobalDivisionPlan(nodesInfo []types.NodeInfo, need int, resource types.GlobalResourceType) ([]types.NodeInfo, error) {
nodesInfo = scoreSort(nodesInfo)
func GlobalDivisionPlan(nodesInfo []types.NodeInfo, need int, resourceType types.ResourceType) ([]types.NodeInfo, error) {
nodesInfo = scoreSort(nodesInfo, resourceType)
length := len(nodesInfo)
i := 0

Expand All @@ -17,15 +17,15 @@ func GlobalDivisionPlan(nodesInfo []types.NodeInfo, need int, resource types.Glo
deploy := 0
delta := 0.0
if i < length-1 {
delta = utils.Round(nodesInfo[i+1].CPURate + nodesInfo[i+1].MemRate + nodesInfo[i+1].StorageRate - nodesInfo[i].CPURate + nodesInfo[i].MemRate + nodesInfo[i].StorageRate)
delta = utils.Round(nodesInfo[i+1].GetResourceUsage(resourceType) - nodesInfo[i].GetResourceUsage(resourceType))
i++
}
for j := 0; j <= p && need > 0 && delta >= 0; j++ {
// 减枝
if nodesInfo[j].Capacity == 0 {
continue
}
cost := utils.Round(nodesInfo[j].CPURate + nodesInfo[j].MemRate + nodesInfo[j].StorageRate)
cost := utils.Round(nodesInfo[j].GetResourceRate(resourceType))
deploy = int(delta / cost)
if deploy == 0 {
deploy = 1
Expand Down
10 changes: 5 additions & 5 deletions scheduler/complex/potassium.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,21 +219,21 @@ func (m *Potassium) CommonDivision(nodesInfo []types.NodeInfo, need, total int)
// EachDivision deploy containers by each node
// 容量够的机器每一台部署 N 个
// need 是每台机器所需总量,limit 是限制节点数
func (m *Potassium) EachDivision(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error) {
return AveragePlan(nodesInfo, need, limit)
func (m *Potassium) EachDivision(nodesInfo []types.NodeInfo, need, limit int, resourceType types.ResourceType) ([]types.NodeInfo, error) {
return AveragePlan(nodesInfo, need, limit, resourceType)
}

// FillDivision deploy containers fill nodes by count
// 根据之前部署的策略每一台补充到 N 个,超过 N 个忽略
// need 是每台上限, limit 是限制节点数
func (m *Potassium) FillDivision(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error) {
return FillPlan(nodesInfo, need, limit)
func (m *Potassium) FillDivision(nodesInfo []types.NodeInfo, need, limit int, resourceType types.ResourceType) ([]types.NodeInfo, error) {
return FillPlan(nodesInfo, need, limit, resourceType)
}

// GlobalDivision deploy containers by their resource costs
// 尽量使得资源消耗平均
// need 是所需总量,total 是支持部署总量
func (m *Potassium) GlobalDivision(nodesInfo []types.NodeInfo, need, total int, globalResource types.GlobalResourceType) ([]types.NodeInfo, error) {
func (m *Potassium) GlobalDivision(nodesInfo []types.NodeInfo, need, total int, globalResource types.ResourceType) ([]types.NodeInfo, error) {
if total < need {
return nil, types.NewDetailedErr(types.ErrInsufficientRes,
fmt.Sprintf("need: %d, vol: %d", need, total))
Expand Down
4 changes: 2 additions & 2 deletions scheduler/complex/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func shuffle(nodesInfo []types.NodeInfo) []types.NodeInfo {
return nodesInfo
}

func scoreSort(nodesInfo []types.NodeInfo) []types.NodeInfo {
func scoreSort(nodesInfo []types.NodeInfo, byResource types.ResourceType) []types.NodeInfo {
sort.Slice(nodesInfo, func(i, j int) bool {
return nodesInfo[i].CPURate+nodesInfo[i].MemRate+nodesInfo[i].StorageRate < nodesInfo[j].CPURate+nodesInfo[j].MemRate+nodesInfo[j].StorageRate
return nodesInfo[i].GetResourceRate(byResource) < nodesInfo[j].GetResourceRate(byResource)
})
return nodesInfo
}
6 changes: 3 additions & 3 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ type Scheduler interface {
// select nodes from nodes, return a list a nodenames and the corresponding volumemap
SelectVolumeNodes(nodeInfo []types.NodeInfo, vbs types.VolumeBindings) ([]types.NodeInfo, map[string][]types.VolumePlan, int, error)
// global division
GlobalDivision(nodesInfo []types.NodeInfo, need, total int, resource types.GlobalResourceType) ([]types.NodeInfo, error)
GlobalDivision(nodesInfo []types.NodeInfo, need, total int, resourceType types.ResourceType) ([]types.NodeInfo, error)
// common division
CommonDivision(nodesInfo []types.NodeInfo, need, total int) ([]types.NodeInfo, error)
// average division
EachDivision(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error)
EachDivision(nodesInfo []types.NodeInfo, need, limit int, resourceType types.ResourceType) ([]types.NodeInfo, error)
// fill division
FillDivision(nodesInfo []types.NodeInfo, need, limit int) ([]types.NodeInfo, error)
FillDivision(nodesInfo []types.NodeInfo, need, limit int, resourceType types.ResourceType) ([]types.NodeInfo, error)
}
27 changes: 21 additions & 6 deletions types/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,9 @@ type NodeInfo struct {
NUMAMemory NUMAMemory
MemCap int64
StorageCap int64
CPUUsed float64 // CPU目前占用率
MemUsage float64 // MEM目前占用率
StorageUsage float64 // Current storage usage ratio
CPURate float64 // 需要增加的 CPU 占用率
MemRate float64 // 需要增加的内存占有率
StorageRate float64 // Storage ratio which would be allocated

Usages map[ResourceType]float64
Rates map[ResourceType]float64

CPUPlan []CPUMap
VolumePlans []VolumePlan // {{"AUTO:/data:rw:1024": "/mnt0:/data:rw:1024"}}
Expand All @@ -334,6 +331,24 @@ type NodeInfo struct {
// 其他需要 filter 的字段
}

func (n *NodeInfo) GetResourceUsage(resource ResourceType) (usage float64) {
for _, t := range AllResourceTypes {
if t&resource != 0 {
usage += n.Usages[t]
}
}
return
}

func (n *NodeInfo) GetResourceRate(resource ResourceType) (rate float64) {
for _, t := range AllResourceTypes {
if t&resource != 0 {
rate += n.Rates[t]
}
}
return
}

// NodeResource for node check
type NodeResource struct {
Name string
Expand Down

0 comments on commit 9faf9ea

Please sign in to comment.