Skip to content

Commit

Permalink
use tri options to revise cpu bind and memory limit (#227)
Browse files Browse the repository at this point in the history
* use tri options to revise cpu bind and memory limit

* refactor realloc resource interface with options
  • Loading branch information
CMGS committed Jul 23, 2020
1 parent 83ddc24 commit 92447db
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 419 deletions.
41 changes: 17 additions & 24 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ type nodeContainers map[string][]*types.Container
type volCPUMemNodeContainers map[string]map[float64]map[int64]nodeContainers

// ReallocResource allow realloc container resource
func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64, bindCPU types.BindCPUOptions, memory int64, memoryLimit types.MemoryLimitOptions, volumes types.VolumeBindings) (chan *types.ReallocResourceMessage, error) {
func (c *Calcium) ReallocResource(ctx context.Context, opts *types.ReallocOptions) (chan *types.ReallocResourceMessage, error) {
ch := make(chan *types.ReallocResourceMessage)
go func() {
defer close(ch)
if err := c.withContainersLocked(ctx, IDs, func(containers map[string]*types.Container) error {
if err := c.withContainersLocked(ctx, opts.IDs, func(containers map[string]*types.Container) error {
// Pod-Node-Containers
containersInfo := map[*types.Pod]nodeContainers{}
// Pod cache
Expand Down Expand Up @@ -58,14 +58,14 @@ func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64
for _, nodeContainersInfo := range containersInfo {
go func(nodeContainersInfo nodeContainers) {
defer wg.Done()
c.doReallocContainer(ctx, ch, nodeContainersInfo, cpu, bindCPU, memory, memoryLimit, volumes)
c.doReallocContainer(ctx, ch, nodeContainersInfo, opts)
}(nodeContainersInfo)
}
wg.Wait()
return nil
}); err != nil {
log.Errorf("[ReallocResource] Realloc failed %v", err)
for _, ID := range IDs {
for _, ID := range opts.IDs {
ch <- &types.ReallocResourceMessage{
ContainerID: ID,
Error: err,
Expand All @@ -76,15 +76,8 @@ func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64
return ch, nil
}

func (c *Calcium) doReallocContainer(
ctx context.Context,
ch chan *types.ReallocResourceMessage,
nodeContainersInfo nodeContainers,
cpu float64, bindCPU types.BindCPUOptions,
memory int64, memoryLimit types.MemoryLimitOptions,
volumes types.VolumeBindings) {

volCPUMemNodeContainersInfo, hardVbsForContainer := calVolCPUMemNodeContainersInfo(ch, nodeContainersInfo, cpu, memory, volumes)
func (c *Calcium) doReallocContainer(ctx context.Context, ch chan *types.ReallocResourceMessage, nodeContainersInfo nodeContainers, opts *types.ReallocOptions) {
volCPUMemNodeContainersInfo, hardVbsForContainer := calVolCPUMemNodeContainersInfo(ch, nodeContainersInfo, opts.CPU, opts.Memory, opts.Volumes)
for newAutoVol, cpuMemNodeContainersInfo := range volCPUMemNodeContainersInfo {
for newCPU, memNodesContainers := range cpuMemNodeContainersInfo {
for newMemory, nodesContainers := range memNodesContainers {
Expand All @@ -105,14 +98,14 @@ func (c *Calcium) doReallocContainer(
node.IncrNUMANodeMemory(nodeID, container.Memory)
}
// cpu 绑定判断
switch bindCPU {
case types.ReallocKeepCPUCurrent:
switch opts.BindCPU {
case types.TriKeep:
if len(container.CPU) > 0 {
containerWithCPUBind++
}
case types.ReallocBindCPU:
case types.TriTrue:
containerWithCPUBind++
case types.ReallocFreeCPU:
case types.TriFalse:
containerWithCPUBind = 0
}
}
Expand Down Expand Up @@ -156,7 +149,7 @@ func (c *Calcium) doReallocContainer(
ctx,
// if
func(ctx context.Context) error {
return c.updateContainersResources(ctx, ch, node, containers, newResource, cpusets, hardVbsForContainer, newAutoVol, bindCPU, memoryLimit) // nolint
return c.updateContainersResources(ctx, ch, node, containers, newResource, cpusets, hardVbsForContainer, newAutoVol, opts.BindCPU, opts.MemoryLimit) // nolint
},
// then
func(ctx context.Context) (err error) {
Expand Down Expand Up @@ -189,7 +182,7 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.
node *types.Node, containers []*types.Container,
newResource *enginetypes.VirtualizationResource,
cpusets []types.CPUMap, hardVbsForContainer map[string]types.VolumeBindings, newAutoVol string,
bindCPU types.BindCPUOptions, memoryLimit types.MemoryLimitOptions) error {
bindCPU, memoryLimit types.TriOptions) error {

autoVbs, _ := types.MakeVolumeBindings(strings.Split(newAutoVol, ","))
planForContainers, err := c.reallocVolume(node, containers, autoVbs)
Expand All @@ -200,7 +193,7 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.
for _, container := range containers {
// 情况1,原来就有绑定cpu的,保持不变
// 情况2,有绑定指令,不管之前有没有cpuMap,都分配
if (len(container.CPU) > 0 && bindCPU == types.ReallocKeepCPUCurrent) || bindCPU == types.ReallocBindCPU {
if (len(container.CPU) > 0 && bindCPU == types.TriKeep) || bindCPU == types.TriTrue {
newResource.CPU = cpusets[0]
newResource.NUMANode = node.GetNUMANode(cpusets[0])
cpusets = cpusets[1:]
Expand All @@ -212,12 +205,12 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.
}

switch memoryLimit {
case types.ReallocMemoryInheritLimit:
case types.TriKeep:
newResource.SoftLimit = container.SoftLimit
case types.ReallocMemoryHardLimit:
newResource.SoftLimit = false
case types.ReallocMemorySoftLimit:
case types.TriTrue:
newResource.SoftLimit = true
case types.TriFalse:
newResource.SoftLimit = false
}

newResource.Volumes = append(newResource.Volumes, hardVbsForContainer[container.ID].ToStringSlice(false, false)...)
Expand Down
45 changes: 28 additions & 17 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ import (
"github.com/stretchr/testify/mock"
)

func newReallocOptions(ids []string, cpu float64, memory int64, vbs types.VolumeBindings, bindCPU, memoryLimit types.TriOptions) *types.ReallocOptions {
return &types.ReallocOptions{
IDs: ids,
CPU: cpu,
Memory: memory,
Volumes: vbs,
BindCPU: bindCPU,
MemoryLimit: memoryLimit,
}
}

func TestRealloc(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
Expand Down Expand Up @@ -72,36 +83,36 @@ func TestRealloc(t *testing.T) {
store.On("GetContainers", mock.Anything, []string{"c1", "c2"}).Return([]*types.Container{c1, c2}, nil)
// failed by lock
store.On("CreateLock", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err := c.ReallocResource(ctx, []string{"c1"}, -1, types.ReallocKeepCPUCurrent, 2*int64(units.GiB), types.ReallocMemoryInheritLimit, nil)
ch, err := c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
// failed by GetPod
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, types.ErrNoETCD).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, types.ReallocKeepCPUCurrent, 2*int64(units.GiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
// failed by newCPU < 0
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, types.ReallocKeepCPUCurrent, 2*int64(units.GiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, -1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed by GetNode
store.On("GetNode", mock.Anything, "node1").Return(nil, types.ErrNoETCD).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, types.ReallocKeepCPUCurrent, 2*int64(units.GiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
store.On("GetNode", mock.Anything, "node1").Return(node1, nil)
// failed by memory not enough
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, types.ReallocKeepCPUCurrent, 2*int64(units.GiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, 0.1, 2*int64(units.GiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -114,14 +125,14 @@ func TestRealloc(t *testing.T) {
"c1": {{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}}},
}
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"})).Return(nil, nodeVolumePlans, 1, nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, types.ReallocKeepCPUCurrent, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed by wrong total
simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, nil).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, types.ReallocKeepCPUCurrent, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -147,7 +158,7 @@ func TestRealloc(t *testing.T) {
Engine: engine,
Endpoint: "http://1.1.1.1:1",
}
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, types.ReallocKeepCPUCurrent, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1", "c2"}, 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -159,7 +170,7 @@ func TestRealloc(t *testing.T) {
store.On("UpdateNode", mock.Anything, mock.Anything).Return(nil)
// failed by update container
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Once()
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, types.ReallocKeepCPUCurrent, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1", "c2"}, 0.1, 2*int64(units.MiB), nil, types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand All @@ -174,21 +185,21 @@ func TestRealloc(t *testing.T) {
},
}
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:100"})).Return(nil, nodeVolumePlans, 4, nil).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, types.ReallocKeepCPUCurrent, int64(units.MiB), types.ReallocMemoryInheritLimit, types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}))
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}), types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed by volume schedule error
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientVolume).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, types.ReallocKeepCPUCurrent, int64(units.MiB), types.ReallocMemoryInheritLimit, types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}))
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}), types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
}
// failed due to re-volume plan less then container number
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nodeVolumePlans, 0, nil).Twice()
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, types.ReallocKeepCPUCurrent, int64(units.MiB), types.ReallocMemoryInheritLimit, types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}))
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c1", "c2"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}), types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand Down Expand Up @@ -252,7 +263,7 @@ func TestRealloc(t *testing.T) {
store.On("GetNode", mock.Anything, "node2").Return(node2, nil)
store.On("GetContainers", mock.Anything, []string{"c3", "c4"}).Return([]*types.Container{c3, c4}, nil)
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrBadContainerID).Twice()
ch, err = c.ReallocResource(ctx, []string{"c3", "c4"}, 0.1, types.ReallocKeepCPUCurrent, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}))
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c3", "c4"}, 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}), types.TriKeep, types.TriKeep))
assert.NoError(t, err)
for r := range ch {
assert.Error(t, r.Error)
Expand Down Expand Up @@ -467,28 +478,28 @@ func TestReallocBindCpu(t *testing.T) {
engine.On("VirtualizationUpdateResource", mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateNode", mock.Anything, mock.Anything).Return(nil)
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(nil)
ch, err := c.ReallocResource(ctx, []string{"c5"}, 0.1, types.ReallocFreeCPU, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, nil)
ch, err := c.ReallocResource(ctx, newReallocOptions([]string{"c5"}, 0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep))
for r := range ch {
assert.NoError(t, r.Error)
}
assert.NoError(t, err)
assert.Empty(t, c5.CPU)

ch, err = c.ReallocResource(ctx, []string{"c6"}, 0.1, types.ReallocBindCPU, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c6"}, 0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep))
for r := range ch {
assert.NoError(t, r.Error)
}
assert.NoError(t, err)
assert.NotEmpty(t, c6.CPU)

ch, err = c.ReallocResource(ctx, []string{"c6", "c5"}, -0.1, types.ReallocBindCPU, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c6", "c5"}, -0.1, 2*int64(units.MiB), nil, types.TriTrue, types.TriKeep))
for r := range ch {
assert.NoError(t, r.Error)
}
assert.NoError(t, err)
assert.NotEmpty(t, c6.CPU)
assert.NotEmpty(t, c5.CPU)
ch, err = c.ReallocResource(ctx, []string{"c6", "c5"}, -0.1, types.ReallocFreeCPU, 2*int64(units.MiB), types.ReallocMemoryInheritLimit, nil)
ch, err = c.ReallocResource(ctx, newReallocOptions([]string{"c6", "c5"}, -0.1, 2*int64(units.MiB), nil, types.TriFalse, types.TriKeep))
for r := range ch {
assert.NoError(t, r.Error)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ type Cluster interface {
DissociateContainer(ctx context.Context, IDs []string) (chan *types.DissociateContainerMessage, error)
ControlContainer(ctx context.Context, IDs []string, t string, force bool) (chan *types.ControlContainerMessage, error)
ExecuteContainer(ctx context.Context, opts *types.ExecuteContainerOptions, inCh <-chan []byte) chan *types.AttachContainerMessage
ReallocResource(ctx context.Context, IDs []string, cpu float64, bindCPU types.BindCPUOptions, memory int64, memoryLimit types.MemoryLimitOptions, volumes types.VolumeBindings) (chan *types.ReallocResourceMessage, error)
ReallocResource(ctx context.Context, opts *types.ReallocOptions) (chan *types.ReallocResourceMessage, error)
LogStream(ctx context.Context, opts *types.LogStreamOptions) (chan *types.LogStreamMessage, error)
RunAndWait(ctx context.Context, opts *types.DeployOptions, inCh <-chan []byte) (<-chan *types.AttachContainerMessage, error)
// finalizer
Expand Down
14 changes: 7 additions & 7 deletions cluster/mocks/Cluster.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 92447db

Please sign in to comment.