Skip to content

Commit

Permalink
add realloc cpu tests
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Nov 26, 2018
1 parent 1f272d6 commit a99d831
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 25 deletions.
28 changes: 14 additions & 14 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (c *Calcium) doReallocContainerWithMemoryPrior(
// 不考虑 memory < 0 对于系统而言,这时候 realloc 只不过使得 node 记录的内存 > 容器拥有内存总和,并不会 OOM
if memory > 0 {
if err := c.doReallocNodesMemory(ctx, nodeContainersInfo, memory); err != nil {
log.Errorf("[doReallocContainerWithMemoryPrior] realloc memory failed %v", err)
log.Errorf("[doReallocContainerWithMemoryPrior] Realloc memory failed %v", err)
for _, containers := range nodeContainersInfo {
for _, container := range containers {
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
Expand Down Expand Up @@ -140,20 +140,20 @@ func (c *Calcium) doUpdateContainerWithMemoryPrior(
cpu float64, memory int64) {
for node, containers := range nodeContainersInfo {
for _, container := range containers {
newCPU := utils.Round(container.Quota+cpu, 2)
newCPU := utils.Round(container.Quota + cpu)
newMemory := container.Memory + memory
// 内存不能低于 4MB
if newCPU <= 0 || newMemory <= minMemory {
log.Errorf("[doUpdateContainerWithMemoryPrior] new resource invaild %s, %f, %d", container.ID, newCPU, newMemory)
log.Errorf("[doUpdateContainerWithMemoryPrior] New resource invaild %s, %f, %d", container.ID, newCPU, newMemory)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
log.Debugf("[doUpdateContainerWithMemoryPrior] container %s: cpu: %f, mem: %d", container.ID, newCPU, newMemory)
log.Debugf("[doUpdateContainerWithMemoryPrior] Container %s: cpu: %f, mem: %d", container.ID, newCPU, newMemory)
// CPUQuota not cpu
newResource := makeResourceSetting(newCPU, newMemory, nil, container.SoftLimit)
updateConfig := enginecontainer.UpdateConfig{Resources: newResource}
if _, err := node.Engine.ContainerUpdate(ctx, container.ID, updateConfig); err != nil {
log.Errorf("[doUpdateContainerWithMemoryPrior] update container failed %v, %s", err, container.ID)
log.Errorf("[doUpdateContainerWithMemoryPrior] Update container failed %v, %s", err, container.ID)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
Expand All @@ -167,14 +167,14 @@ func (c *Calcium) doUpdateContainerWithMemoryPrior(
container.Quota = newCPU
container.Memory = newMemory
if err := c.store.UpdateContainer(ctx, container); err != nil {
log.Warnf("[doUpdateContainerWithMemoryPrior] update container %s failed %v", container.ID, err)
log.Warnf("[doUpdateContainerWithMemoryPrior] Update container %s failed %v", container.ID, err)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true}
}
if err := c.store.UpdateNode(ctx, node); err != nil {
log.Errorf("[doUpdateContainerWithMemoryPrior] update node %s failed %s", node.Name, err)
log.Errorf("[doUpdateContainerWithMemoryPrior] Update node %s failed %s", node.Name, err)
litter.Dump(node)
return
}
Expand All @@ -190,10 +190,10 @@ func (c *Calcium) doReallocContainersWithCPUPrior(
cpuMemNodeContainersInfo := cpuMemNodeContainers{}
for node, containers := range nodeContainersInfo {
for _, container := range containers {
newCPU := utils.Round(container.Quota+cpu, 2)
newCPU := utils.Round(container.Quota + cpu)
newMem := container.Memory + memory
if newCPU < 0 || newMem < minMemory {
log.Errorf("[reallocContainersWithCPUPrior] new resource invaild %s, %f, %d", container.ID, newCPU, newMem)
log.Errorf("[reallocContainersWithCPUPrior] New resource invaild %s, %f, %d", container.ID, newCPU, newMem)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
Expand All @@ -212,7 +212,7 @@ func (c *Calcium) doReallocContainersWithCPUPrior(

cpuMemNodesMapInfo, err := c.doReallocNodesCPUMem(ctx, cpuMemNodeContainersInfo)
if err != nil {
log.Errorf("[doReallocContainersWithCPUPrior] realloc cpu resource failed %v", err)
log.Errorf("[doReallocContainersWithCPUPrior] Realloc cpu resource failed %v", err)
for _, memNodeMap := range cpuMemNodeContainersInfo {
for _, nodeInfoMap := range memNodeMap {
for _, containers := range nodeInfoMap {
Expand Down Expand Up @@ -258,7 +258,7 @@ func (c *Calcium) doReallocNodesCPUMem(
}

// 重新计算需求
nodesInfo, nodeCPUPlans, total, err := c.scheduler.SelectCPUNodes(nodesInfo, requireCPU, requireMemory)
_, nodeCPUPlans, total, err := c.scheduler.SelectCPUNodes(nodesInfo, requireCPU, requireMemory)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func (c *Calcium) doUpdateContainersWithCPUPrior(
resource := makeResourceSetting(newCPU, newMem, cpuPlan, container.SoftLimit)
updateConfig := enginecontainer.UpdateConfig{Resources: resource}
if _, err := node.Engine.ContainerUpdate(ctx, container.ID, updateConfig); err != nil {
log.Errorf("[doReallocContainersWithCPUPrior] update container failed %v", err)
log.Errorf("[doReallocContainersWithCPUPrior] Update container failed %v", err)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
Expand All @@ -307,14 +307,14 @@ func (c *Calcium) doUpdateContainersWithCPUPrior(
container.Quota = newCPU
container.Memory = newMem
if err := c.store.UpdateContainer(ctx, container); err != nil {
log.Warnf("[doReallocContainersWithCPUPrior] update container %s failed %v", container.ID, err)
log.Warnf("[doReallocContainersWithCPUPrior] Update container %s failed %v", container.ID, err)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true}
}
if err := c.store.UpdateNode(ctx, node); err != nil {
log.Errorf("[doReallocContainersWithCPUPrior] update node %s failed %s", node.Name, err)
log.Errorf("[doReallocContainersWithCPUPrior] Update node %s failed %s", node.Name, err)
litter.Dump(node)
return
}
Expand Down
113 changes: 109 additions & 4 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

enginemocks "github.com/projecteru2/core/3rdmocks"
lockmocks "github.com/projecteru2/core/lock/mocks"
schedulermocks "github.com/projecteru2/core/scheduler/mocks"
storemocks "github.com/projecteru2/core/store/mocks"
"github.com/projecteru2/core/types"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -61,25 +62,24 @@ func TestReallocMem(t *testing.T) {

store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
store.On("GetContainer", mock.Anything, mock.Anything).Return(c1, nil)
store.On("GetPod", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
// get pod failed
store.On("GetPod", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0, 0)
assert.NoError(t, err)
for c := range ch {
assert.False(t, c.Success)
}
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)

store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrPodNoNodes).Once()
// get node failed
// pod favor invaild
store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrPodNoNodes).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0, 0)
assert.NoError(t, err)
for c := range ch {
assert.False(t, c.Success)
}
// node cap not enough
store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(node1, nil)
// node cap not enough
pod1.Favor = scheduler.MEMORY_PRIOR
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0, 2*types.GByte)
assert.NoError(t, err)
Expand Down Expand Up @@ -122,3 +122,108 @@ func TestReallocMem(t *testing.T) {
}
assert.Equal(t, node1.MemCap-origin, int64(1))
}

func TestReallocCPU(t *testing.T) {
c := NewTestCluster()
ctx := context.Background()
store := &storemocks.Store{}
c.store = store

lock := &lockmocks.DistributedLock{}
lock.On("Lock", mock.Anything).Return(nil)
lock.On("Unlock", mock.Anything).Return(nil)

engine := &enginemocks.APIClient{}
engine.On("ContainerInspect", mock.Anything, mock.Anything).Return(enginetypes.ContainerJSON{}, nil)

pod1 := &types.Pod{
Name: "p1",
Favor: "wtf",
}

node1 := &types.Node{
Name: "node1",
MemCap: types.GByte,
CPU: types.CPUMap{"0": 10, "1": 70, "2": 10, "3": 100},
Engine: engine,
Endpoint: "http://1.1.1.1:1",
}

c1 := &types.Container{
ID: "c1",
Podname: "p1",
Engine: engine,
Memory: 5 * types.MByte,
HostIP: node1.GetIP(),
Quota: 0.9,
CPU: types.CPUMap{"2": 90},
}

store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil)
store.On("GetContainer", mock.Anything, mock.Anything).Return(c1, nil)
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(node1, nil)
pod1.Favor = scheduler.CPU_PRIOR
// wrong cpu
ch, err := c.ReallocResource(ctx, []string{"c1"}, -1, 2*types.GByte)
assert.NoError(t, err)
for c := range ch {
assert.False(t, c.Success)
}
simpleMockScheduler := &schedulermocks.Scheduler{}
c.scheduler = simpleMockScheduler
// wrong selectCPUNodes
simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientMEM).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 1, 2*types.GByte)
assert.NoError(t, err)
for c := range ch {
assert.False(t, c.Success)
}
// wrong total
simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, nil).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 1, 2*types.GByte)
assert.NoError(t, err)
for c := range ch {
assert.False(t, c.Success)
}
// good to go
nodeCPUPlans := map[string][]types.CPUMap{
node1.Name: []types.CPUMap{
types.CPUMap{
"3": 100,
},
types.CPUMap{
"2": 100,
},
},
}
simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil)
// apply resource failed
// update node failed
engine.On("ContainerUpdate", mock.Anything, mock.Anything, mock.Anything).Return(containertypes.ContainerUpdateOKBody{}, types.ErrBadContainerID).Once()
store.On("UpdateNode", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 1, 1)
assert.NoError(t, err)
for c := range ch {
assert.False(t, c.Success)
}
engine.On("ContainerUpdate", mock.Anything, mock.Anything, mock.Anything).Return(containertypes.ContainerUpdateOKBody{}, nil)
store.On("UpdateNode", mock.Anything, mock.Anything).Return(nil)
// update container failed
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once()
origin := node1.MemCap
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 1)
assert.NoError(t, err)
for c := range ch {
assert.False(t, c.Success)
}
assert.Equal(t, origin-node1.MemCap, int64(1))
assert.Equal(t, node1.CPU["3"], 0)
// success
store.On("UpdateContainer", mock.Anything, mock.Anything).Return(nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 1)
assert.NoError(t, err)
for c := range ch {
assert.True(t, c.Success)
}
}
5 changes: 2 additions & 3 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,8 @@ func CreateTarStream(path string) (io.ReadCloser, error) {
}

// Round for float64 to int
func Round(f float64, n int) float64 {
n10 := math.Pow10(n)
return math.Trunc((f+0.5/n10)*n10) / n10
func Round(f float64) float64 {
return math.Round(f*100) / 100
}

// copied from https://gist.github.com/jmervine/d88c75329f98e09f5c87
Expand Down
8 changes: 4 additions & 4 deletions utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ func TestRound(t *testing.T) {
return strconv.FormatFloat(f, 'f', -1, 64)
}
a := 0.0199999998
assert.Equal(t, f(Round(a, 2)), "0.02")
assert.Equal(t, f(Round(a)), "0.02")
a = 0.1999998
assert.Equal(t, f(Round(a, 2)), "0.2")
assert.Equal(t, f(Round(a)), "0.2")
a = 1.999998
assert.Equal(t, f(Round(a, 2)), "2")
assert.Equal(t, f(Round(a)), "2")
a = 19.99998
assert.Equal(t, f(Round(a, 2)), "20")
assert.Equal(t, f(Round(a)), "20")
}

0 comments on commit a99d831

Please sign in to comment.