Skip to content

Commit

Permalink
realloc.go support bindCpu/unbindCpu (#190)
Browse files Browse the repository at this point in the history
* realloc support bindCpu/unbindCpu
  • Loading branch information
Tyraelone authored Apr 2, 2020
1 parent 2487e2a commit c60b6b5
Show file tree
Hide file tree
Showing 8 changed files with 485 additions and 306 deletions.
31 changes: 23 additions & 8 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type nodeContainers map[string][]*types.Container
type volCPUMemNodeContainers map[string]map[float64]map[int64]nodeContainers

// ReallocResource allow realloc container resource
func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64, volumes types.VolumeBindings) (chan *types.ReallocResourceMessage, error) {
func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64, volumes types.VolumeBindings, bindCPUOpt types.BindCPUOptions) (chan *types.ReallocResourceMessage, error) {
ch := make(chan *types.ReallocResourceMessage)
go func() {
defer close(ch)
Expand Down Expand Up @@ -58,7 +58,7 @@ func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64
for pod, nodeContainersInfo := range containersInfo {
go func(pod *types.Pod, nodeContainersInfo nodeContainers) {
defer wg.Done()
c.doReallocContainer(ctx, ch, pod, nodeContainersInfo, cpu, memory, volumes)
c.doReallocContainer(ctx, ch, pod, nodeContainersInfo, cpu, memory, volumes, bindCPUOpt)
}(pod, nodeContainersInfo)
}
wg.Wait()
Expand Down Expand Up @@ -129,7 +129,8 @@ func (c *Calcium) doReallocContainer(
ch chan *types.ReallocResourceMessage,
pod *types.Pod,
nodeContainersInfo nodeContainers,
cpu float64, memory int64, volumes types.VolumeBindings) {
cpu float64, memory int64, volumes types.VolumeBindings,
bindCPUOpt types.BindCPUOptions) {

volCPUMemNodeContainersInfo, hardVbsForContainer := calVolCPUMemNodeContainersInfo(ch, nodeContainersInfo, cpu, memory, volumes)

Expand All @@ -151,10 +152,19 @@ func (c *Calcium) doReallocContainer(
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
node.IncrNUMANodeMemory(nodeID, container.Memory)
}
if len(container.CPU) > 0 {
// cpu 绑定判断
switch bindCPUOpt {
case types.BindCPUOptionKeep:
if len(container.CPU) > 0 {
containerWithCPUBind++
}
case types.BindCPUOptionBind:
containerWithCPUBind++
case types.BindCPUOptionUnbind:
containerWithCPUBind = 0
}
}

// 检查内存
if newMemory != 0 {
if cap := int(node.MemCap / newMemory); cap < len(containers) {
Expand Down Expand Up @@ -190,7 +200,7 @@ func (c *Calcium) doReallocContainer(
Memory: newMemory,
}

if err := c.updateContainersResources(ctx, ch, node, containers, newResource, cpusets, hardVbsForContainer, newAutoVol); err != nil {
if err := c.updateContainersResources(ctx, ch, node, containers, newResource, cpusets, hardVbsForContainer, newAutoVol, bindCPUOpt); err != nil {
return err
}

Expand All @@ -214,8 +224,10 @@ func (c *Calcium) doReallocContainer(
}
}

func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.ReallocResourceMessage, node *types.Node, containers []*types.Container,
newResource *enginetypes.VirtualizationResource, cpusets []types.CPUMap, hardVbsForContainer map[string]types.VolumeBindings, newAutoVol string) error {
func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.ReallocResourceMessage,
node *types.Node, containers []*types.Container,
newResource *enginetypes.VirtualizationResource,
cpusets []types.CPUMap, hardVbsForContainer map[string]types.VolumeBindings, newAutoVol string, bindCPUOpt types.BindCPUOptions) error {

autoVbs, _ := types.MakeVolumeBindings(strings.Split(newAutoVol, ","))
planForContainers, err := c.reallocVolume(node, containers, autoVbs)
Expand All @@ -224,7 +236,10 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.
}

for _, container := range containers {
if len(container.CPU) > 0 {
// 情况1,原来就有绑定cpu的,保持不变
if (len(container.CPU) > 0 && bindCPUOpt == types.BindCPUOptionKeep) ||
// 情况2,有绑定指令,不管之前有没有cpuMap,都分配
bindCPUOpt == types.BindCPUOptionBind {
newResource.CPU = cpusets[0]
newResource.NUMANode = node.GetNUMANode(cpusets[0])
cpusets = cpusets[1:]
Expand Down
134 changes: 121 additions & 13 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package calcium

import (
"context"
complexscheduler "github.com/projecteru2/core/scheduler/complex"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -70,36 +71,36 @@ func TestRealloc(t *testing.T) {
store.On("GetContainers", mock.Anything, []string{"c1", "c2"}).Return([]*types.Container{c1, c2}, nil)
// failed by lock
store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err := c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil)
ch, err := c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
// failed by GetPod
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, types.ErrNoETCD).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
// failed by newCPU < 0
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed by GetNode
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.GiB), nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.GiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
// failed by memory not enough
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.GiB), nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.GiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -112,14 +113,14 @@ func TestRealloc(t *testing.T) {
"c1": {{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}}},
}
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"})).Return(nil, nodeVolumePlans, 1, nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.MiB), nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.MiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed by wrong total
simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, nil).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.MiB), nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.MiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -144,7 +145,7 @@ func TestRealloc(t *testing.T) {
Engine: engine,
Endpoint: "http://1.1.1.1:1",
}
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, 2*int64(units.MiB), nil)
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, 2*int64(units.MiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -156,7 +157,7 @@ func TestRealloc(t *testing.T) {
store.On("UpdateNode", mock.Anything, mock.Anything).Return(nil)
// failed by update container
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Once()
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, 2*int64(units.MiB), nil)
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, 2*int64(units.MiB), nil, types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -171,21 +172,21 @@ func TestRealloc(t *testing.T) {
},
}
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:100"})).Return(nil, nodeVolumePlans, 4, nil).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}))
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}), types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed by volume schedule error
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientVolume).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}))
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}), types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed due to re-volume plan less then container number
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nodeVolumePlans, 0, nil).Twice()
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}))
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}), types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand Down Expand Up @@ -249,7 +250,7 @@ func TestRealloc(t *testing.T) {
store.On("GetNode", mock.Anything, "node2").Return(node2, nil)
store.On("GetContainers", mock.Anything, []string{"c3", "c4"}).Return([]*types.Container{c3, c4}, nil)
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Twice()
ch, err = c.ReallocResource(ctx, []string{"c3", "c4"}, 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}))
ch, err = c.ReallocResource(ctx, []string{"c3", "c4"}, 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}), types.BindCPUOptionKeep)
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -259,6 +260,7 @@ func TestRealloc(t *testing.T) {
assert.Equal(t, node2.MemCap, int64(units.GiB)-4*int64(units.MiB))
assert.Equal(t, node2.Volume, types.VolumeMap{"/dir0": 250, "/dir1": 200, "/dir2": 200})
assert.Equal(t, node2.VolumeUsed, int64(250))

}

func TestReallocVolume(t *testing.T) {
Expand Down Expand Up @@ -388,3 +390,109 @@ func TestReallocVolume(t *testing.T) {
assert.Equal(t, plans[c2][*newVbs[2]], types.VolumeMap{"/dir1": 110})
assert.Equal(t, plans[c2][*newVbs[3]], types.VolumeMap{"/dir0": 20})
}

func TestReallocBindCpu(t *testing.T) {

c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
c.store = store
pod1 := &types.Pod{
Name: "p1",
}
lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(nil)
lock.On("Unlock", mock.Anything).Return(nil)
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
engine := &enginemocks.API{}
engine.On("VirtualizationInspect", mock.Anything, mock.Anything).Return(&enginetypes.VirtualizationInfo{}, nil)

config := types.Config{
LogLevel: "",
Bind: "",
LockTimeout: 0,
GlobalTimeout: 0,
Statsd: "",
Profile: "",
CertPath: "",
Auth: types.AuthConfig{},
GRPCConfig: types.GRPCConfig{},
Git: types.GitConfig{},
Etcd: types.EtcdConfig{},
Docker: types.DockerConfig{},
Scheduler: types.SchedConfig{MaxShare: -1, ShareBase: 100},
Virt: types.VirtConfig{},
Systemd: types.SystemdConfig{},
}
simpleMockScheduler, _ := complexscheduler.New(config)
c.scheduler = simpleMockScheduler

//test bindCpu
node3 := &types.Node{
Name: "node3",
MemCap: int64(units.GiB),
CPU: types.CPUMap{"0": 10, "1": 70, "2": 10, "3": 100},
CPUUsed: 2.1,
Engine: engine,
Endpoint: "http://1.1.1.1:1",
NUMA: types.NUMA{"2": "0"},
NUMAMemory: types.NUMAMemory{"0": 100000},
Volume: types.VolumeMap{"/dir0": 200, "/dir1": 200, "/dir2": 200},
VolumeUsed: int64(300),
}
c5 := &types.Container{
ID: "c5",
Podname: "p1",
Engine: engine,
Memory: 5 * int64(units.MiB),
Quota: 0.9,
CPU: types.CPUMap{"2": 90},
Nodename: "node3",
}
c6 := &types.Container{
ID: "c6",
Podname: "p1",
Engine: engine,
Memory: 5 * int64(units.MiB),
Quota: 0.9,
Nodename: "node3",
}

store.On("GetNode", mock.Anything, "node3").Return(node3, nil)
store.On("GetContainers", mock.Anything, []string{"c5"}).Return([]*types.Container{c5}, nil)
store.On("GetContainers", mock.Anything, []string{"c6"}).Return([]*types.Container{c6}, nil)
store.On("GetContainers", mock.Anything, []string{"c6", "c5"}).Return([]*types.Container{c5, c6}, nil)
engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateNode", mock.Anything, mock.Anything).Return(nil)
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(nil)
ch, err := c.ReallocResource(ctx, []string{"c5"}, 0.1, 2*int64(units.MiB), nil, types.BindCPUOptionUnbind)
for r := range ch {
assert.NoError(t, r.Error)
}
assert.NoError(t, err)
assert.Empty(t, c5.CPU)

ch, err = c.ReallocResource(ctx, []string{"c6"}, 0.1, 2*int64(units.MiB), nil, types.BindCPUOptionBind)
for r := range ch {
assert.NoError(t, r.Error)
}
assert.NoError(t, err)
assert.NotEmpty(t, c6.CPU)

ch, err = c.ReallocResource(ctx, []string{"c6", "c5"}, -0.1, 2*int64(units.MiB), nil, types.BindCPUOptionBind)
for r := range ch {
assert.NoError(t, r.Error)
}
assert.NoError(t, err)
assert.NotEmpty(t, c6.CPU)
assert.NotEmpty(t, c5.CPU)
ch, err = c.ReallocResource(ctx, []string{"c6", "c5"}, -0.1, 2*int64(units.MiB), nil, types.BindCPUOptionUnbind)
for r := range ch {
assert.NoError(t, r.Error)
}
assert.NoError(t, err)
assert.Empty(t, c6.CPU)
assert.Empty(t, c5.CPU)

}
3 changes: 1 addition & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cluster

import (
"context"

enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/types"
)
Expand Down Expand Up @@ -86,7 +85,7 @@ type Cluster interface {
RemoveContainer(ctx context.Context, IDs []string, force bool, step int) (chan *types.RemoveContainerMessage, error)
DissociateContainer(ctx context.Context, IDs []string) (chan *types.DissociateContainerMessage, error)
ControlContainer(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlContainerMessage, error)
ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64, volumes types.VolumeBindings) (chan *types.ReallocResourceMessage, error)
ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64, volumes types.VolumeBindings, bindCPUOpt types.BindCPUOptions) (chan *types.ReallocResourceMessage, error)
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachContainerMessage, error)
ExecuteContainer(ctx context.Context, opts *types.ExecuteContainerOptions, inCh <-chan []byte) chan *types.AttachContainerMessage
Expand Down
5 changes: 2 additions & 3 deletions cluster/mocks/Cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c60b6b5

Please sign in to comment.