Skip to content

Commit

Permalink
support numa architecture
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jun 11, 2019
1 parent d3beafa commit 132231f
Show file tree
Hide file tree
Showing 12 changed files with 188 additions and 22 deletions.
1 change: 1 addition & 0 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, opts *t
config.CPU = cpumap.Map()
config.Quota = opts.CPUQuota
config.Memory = opts.Memory
config.NUMANode = utils.GetNUMAMemoryNode(node, cpumap)
config.SoftLimit = opts.SoftLimit
entry := opts.Entrypoint

Expand Down
14 changes: 12 additions & 2 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (

"github.com/sanity-io/litter"

"github.com/projecteru2/core/utils"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -110,6 +109,11 @@ func (c *Calcium) doReallocContainer(
node.CPU.Add(container.CPU)
node.SetCPUUsed(container.Quota, types.DecrUsage)
node.MemCap += container.Memory
if nodeID := utils.GetNUMAMemoryNode(node, container.CPU); nodeID != "" {
if _, ok := node.NUMAMem[nodeID]; ok {
node.NUMAMem[nodeID] += container.Memory
}
}
if len(container.CPU) > 0 {
containerWithCPUBind++
}
Expand Down Expand Up @@ -139,6 +143,7 @@ func (c *Calcium) doReallocContainer(
newResource := &enginetypes.VirtualizationResource{Quota: newCPU, Memory: newMemory, SoftLimit: container.SoftLimit}
if len(container.CPU) > 0 {
newResource.CPU = cpusets[0]
newResource.NUMANode = utils.GetNUMAMemoryNode(node, newResource.CPU)
cpusets = cpusets[1:]
}
updateSuccess := false
Expand All @@ -157,6 +162,11 @@ func (c *Calcium) doReallocContainer(
node.CPU.Sub(container.CPU)
node.SetCPUUsed(container.Quota, types.IncrUsage)
node.MemCap -= container.Memory
if nodeID := utils.GetNUMAMemoryNode(node, container.CPU); nodeID != "" {
if _, ok := node.NUMAMem[nodeID]; ok {
node.NUMAMem[nodeID] -= container.Memory
}
}
// 更新 container 元数据
if err := c.store.UpdateContainer(ctx, container); err == nil {
setSuccess = true
Expand Down
4 changes: 2 additions & 2 deletions engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (e *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.Vir
OpenStdin: opts.Stdin,
}

resource := makeResourceSetting(opts.Quota, opts.Memory, opts.CPU, opts.SoftLimit)
resource := makeResourceSetting(opts.Quota, opts.Memory, opts.CPU, opts.NUMANode, opts.SoftLimit)

resource.Ulimits = []*units.Ulimit{}
for name, u := range opts.Ulimits {
Expand Down Expand Up @@ -210,7 +210,7 @@ func (e *Engine) VirtualizationWait(ctx context.Context, ID, state string) (*eng

// VirtualizationUpdateResource update virtualization resource
func (e *Engine) VirtualizationUpdateResource(ctx context.Context, ID string, opts *enginetypes.VirtualizationResource) error {
newResource := makeResourceSetting(opts.Quota, opts.Memory, opts.CPU, opts.SoftLimit)
newResource := makeResourceSetting(opts.Quota, opts.Memory, opts.CPU, opts.NUMANode, opts.SoftLimit)
updateConfig := dockercontainer.UpdateConfig{Resources: newResource}
_, err := e.client.ContainerUpdate(ctx, ID, updateConfig)
return err
Expand Down
4 changes: 3 additions & 1 deletion engine/docker/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func makeMountPaths(opts *enginetypes.VirtualizationCreateOptions) ([]string, ma
return binds, volumes
}

func makeResourceSetting(cpu float64, memory int64, cpuMap map[string]int, softlimit bool) dockercontainer.Resources {
func makeResourceSetting(cpu float64, memory int64, cpuMap map[string]int, numaNode string, softlimit bool) dockercontainer.Resources {
resource := dockercontainer.Resources{}
if cpu > 0 {
resource.CPUPeriod = corecluster.CPUPeriodBase
Expand All @@ -117,6 +117,8 @@ func makeResourceSetting(cpu float64, memory int64, cpuMap map[string]int, softl
cpuIDs = append(cpuIDs, cpuID)
}
resource.CpusetCpus = strings.Join(cpuIDs, ",")
// numaNode will empty or numaNode
resource.CpusetMems = numaNode
}
if softlimit {
resource.MemoryReservation = memory
Expand Down
1 change: 1 addition & 0 deletions engine/types/virtualization.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type VirtualizationResource struct {
Quota float64 // for cpu quota
Memory int64 // for memory binding
SoftLimit bool // soft limit or not
NUMANode string // numa node
}

// VirtualizationUlimits define hard and soft limit
Expand Down
74 changes: 59 additions & 15 deletions scheduler/complex/cpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,26 +161,55 @@ func (h *host) getFullResult(full int, cpus []cpuInfo) []types.CPUMap {

func cpuPriorPlan(cpu float64, memory int64, nodesInfo []types.NodeInfo, maxShareCore, coreShare int) ([]types.NodeInfo, map[string][]types.CPUMap, int, error) {
var nodeContainer = map[string][]types.CPUMap{}
var host *host
var plan []types.CPUMap

// TODO weird check cpu < 0.01

volTotal := 0
memLimit := 0

for p, nodeInfo := range nodesInfo {
host = newHost(nodeInfo.CPUMap, coreShare)
plan = host.getContainerCores(cpu, maxShareCore)
memLimit = int(nodeInfo.MemCap / memory)
cap := len(plan) // 每个node可以放的容器数
if cap > memLimit {
plan = plan[:memLimit]
cap = memLimit
// 统计全局 CPU,为非 numa 或者跨 numa 计算
globalCPUMap := nodeInfo.CPUMap
// 统计全局 MEM
globalMemCap := nodeInfo.MemCap
// 计算每个 numa node 的分配策略
// 得到 numa CPU 分组
numaCPUMap := map[string]types.CPUMap{}
for cpuID, nodeID := range nodeInfo.NUMA {
if _, ok := numaCPUMap[nodeID]; !ok {
numaCPUMap[nodeID] = types.CPUMap{}
}
cpuCount, ok := nodeInfo.CPUMap[cpuID]
if !ok {
continue
}
numaCPUMap[nodeID][cpuID] = cpuCount
}
for nodeID, nodeCPUMap := range numaCPUMap {
nodeMemCap, ok := nodeInfo.NUMAMem[nodeID]
if !ok {
continue
}
cap, plan := calcuateCPUPlan(nodeCPUMap, nodeMemCap, cpu, memory, maxShareCore, coreShare)
if cap > 0 {
if _, ok := nodeContainer[nodeInfo.Name]; !ok {
nodeContainer[nodeInfo.Name] = []types.CPUMap{}
}
nodesInfo[p].Capacity += cap
volTotal += cap
globalMemCap -= int64(cap) * memory
for _, cpuPlan := range plan {
globalCPUMap.Sub(cpuPlan)
nodeContainer[nodeInfo.Name] = append(nodeContainer[nodeInfo.Name], cpuPlan)
}
}
}
// 非 numa
// 或者是扣掉 numa 分配后剩下的资源里面
cap, plan := calcuateCPUPlan(globalCPUMap, globalMemCap, cpu, memory, maxShareCore, coreShare)
if cap > 0 {
nodesInfo[p].Capacity = cap
nodeContainer[nodeInfo.Name] = plan
if _, ok := nodeContainer[nodeInfo.Name]; !ok {
nodeContainer[nodeInfo.Name] = []types.CPUMap{}
}
nodesInfo[p].Capacity += cap
volTotal += cap
nodeContainer[nodeInfo.Name] = append(nodeContainer[nodeInfo.Name], plan...)
}
}

Expand All @@ -194,3 +223,18 @@ func cpuPriorPlan(cpu float64, memory int64, nodesInfo []types.NodeInfo, maxShar
log.Debugf("[cpuPriorPlan] nodesInfo: %v", nodesInfo)
return nodesInfo[p:], nodeContainer, volTotal, nil
}

func calcuateCPUPlan(CPUMap types.CPUMap, MemCap int64, cpu float64, memory int64, maxShareCore, coreShare int) (int, []types.CPUMap) {
host := newHost(CPUMap, coreShare)
plan := host.getContainerCores(cpu, maxShareCore)
memLimit := int(MemCap / memory)
cap := len(plan) // 每个node可以放的容器数
if cap > memLimit {
plan = plan[:memLimit]
cap = memLimit
}
if cap <= 0 {
plan = nil
}
return cap, plan
}
58 changes: 58 additions & 0 deletions scheduler/complex/cpu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package complexscheduler

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/projecteru2/core/types"
)

func TestCPUPriorPlan(t *testing.T) {
// normal 分配
nodesInfo := resetNodesInfo()
_, resultCPUPlan, total, err := cpuPriorPlan(3.0, types.MByte, nodesInfo, -1, 100)
assert.NoError(t, err)
assert.Equal(t, len(resultCPUPlan), 1)
assert.Equal(t, total, 1)
// numa 分配
nodesInfo = resetNodesInfo()
_, resultCPUPlan, total, err = cpuPriorPlan(1.5, types.MByte, nodesInfo, -1, 100)
assert.NoError(t, err)
assert.Equal(t, len(resultCPUPlan), 1)
assert.Equal(t, total, 2)
r := resultCPUPlan["n1"]
for _, p := range r {
_, ok1 := p["1"]
_, ok2 := p["2"]
_, ok3 := p["3"]
_, ok4 := p["4"]
assert.True(t, (ok1 && ok3) || (ok2 && ok4))
}
// numa and normal 分配
nodesInfo = resetNodesInfo()
_, resultCPUPlan, total, err = cpuPriorPlan(1, types.GByte, nodesInfo, -1, 100)
assert.NoError(t, err)
assert.Equal(t, len(resultCPUPlan), 1)
assert.Equal(t, total, 3)
}

func resetNodesInfo() []types.NodeInfo {
return []types.NodeInfo{
types.NodeInfo{
Name: "n1",
CPUMap: types.CPUMap{"1": 100, "2": 100, "3": 100, "4": 100},
MemCap: 3 * types.GByte,
NUMA: types.NUMA{
"1": "node0",
"2": "node1",
"3": "node0",
"4": "node1",
},
NUMAMem: types.NUMAMem{
"node0": types.GByte,
"node1": types.GByte,
},
},
}
}
10 changes: 10 additions & 0 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,20 @@ func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, cpu
node.CPU.Add(cpu)
node.SetCPUUsed(quota, types.DecrUsage)
node.MemCap += mem
if nodeID := utils.GetNUMAMemoryNode(node, cpu); nodeID != "" {
if _, ok := node.NUMAMem[nodeID]; ok {
node.NUMAMem[nodeID] += mem
}
}
case store.ActionDecr:
node.CPU.Sub(cpu)
node.SetCPUUsed(quota, types.IncrUsage)
node.MemCap -= mem
if nodeID := utils.GetNUMAMemoryNode(node, cpu); nodeID != "" {
if _, ok := node.NUMAMem[nodeID]; ok {
node.NUMAMem[nodeID] -= mem
}
}
default:
return types.ErrUnknownControlType
}
Expand Down
9 changes: 7 additions & 2 deletions store/etcdv3/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,21 @@ func TestNode(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, newNode.MemCap, int64(100))
assert.Equal(t, newNode.CPUUsed, 0.0)
assert.NoError(t, m.UpdateNodeResource(ctx, node, types.CPUMap{"0": 10}, 0.01, int64(0), "-"))
// numa mem record
node.NUMA = types.NUMA{"0": "n1"}
node.NUMAMem = map[string]int64{"n1": 100}
assert.NoError(t, m.UpdateNodeResource(ctx, node, types.CPUMap{"0": 10}, 0.01, int64(1), "-"))
newNode, err = m.GetNodeByName(ctx, nodename)
assert.NoError(t, err)
assert.Equal(t, newNode.CPU["0"], 90)
assert.Equal(t, newNode.CPUUsed, 0.01)
assert.NoError(t, m.UpdateNodeResource(ctx, node, types.CPUMap{"0": 10}, 0.01, int64(0), "+"))
assert.Equal(t, newNode.NUMAMem["n1"], int64(99))
assert.NoError(t, m.UpdateNodeResource(ctx, node, types.CPUMap{"0": 10}, 0.01, int64(1), "+"))
newNode, err = m.GetNodeByName(ctx, nodename)
assert.NoError(t, err)
assert.Equal(t, newNode.CPU["0"], 100)
assert.Equal(t, newNode.CPUUsed, 0.0)
assert.Equal(t, newNode.NUMAMem["n1"], int64(100))
err = m.UpdateNodeResource(ctx, node, nil, 0, int64(0), "abc")
assert.Error(t, err)
assert.Equal(t, err.Error(), types.ErrUnknownControlType.Error())
Expand Down
10 changes: 10 additions & 0 deletions types/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,20 @@ func (c CPUMap) Map() map[string]int {
return map[string]int(c)
}

// NUMA define NUMA cpuID->nodeID
type NUMA map[string]string

// NUMAMem fine NUMA mem NODE
type NUMAMem map[string]int64

// Node store node info
type Node struct {
Name string `json:"name"`
Endpoint string `json:"endpoint"`
Podname string `json:"podname"`
CPU CPUMap `json:"cpu"`
NUMA NUMA `json:"numa"`
NUMAMem NUMAMem `json:"numa_mem"`
CPUUsed float64 `json:"cpuused"`
MemCap int64 `json:"memcap"`
Available bool `json:"available"`
Expand Down Expand Up @@ -91,6 +99,8 @@ func (n *Node) SetCPUUsed(quota float64, action string) {
type NodeInfo struct {
Name string
CPUMap CPUMap
NUMA NUMA
NUMAMem NUMAMem
MemCap int64
CPUUsed float64 // CPU目前占用率
MemUsage float64 // MEM目前占用率
Expand Down
15 changes: 15 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,3 +262,18 @@ func safeSplit(s string) []string {

return result
}

// GetNUMAMemoryNode get numa memory node
func GetNUMAMemoryNode(node *types.Node, cpu types.CPUMap) string {
nodeID := ""
for cpuID := range cpu {
if memoryNode, ok := node.NUMA[cpuID]; ok {
if nodeID == "" {
nodeID = memoryNode
} else if nodeID != memoryNode { // 如果跨 NODE 了,让系统决定 nodeID
nodeID = ""
}
}
}
return nodeID
}
10 changes: 10 additions & 0 deletions utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,13 @@ func TestRound(t *testing.T) {
a = 19.99998
assert.Equal(t, f(Round(a)), "20")
}

func TestGetNUMAMemoryNode(t *testing.T) {
node := &types.Node{
NUMA: types.NUMA{"1": "node1", "2": "node2", "3": "node1", "4": "node2"},
}
nodeID := GetNUMAMemoryNode(node, types.CPUMap{"1": 100, "2": 100})
assert.Equal(t, nodeID, "")
nodeID = GetNUMAMemoryNode(node, types.CPUMap{"1": 100, "3": 100})
assert.Equal(t, nodeID, "node1")
}

0 comments on commit 132231f

Please sign in to comment.