diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 81a5a6e43..033842f53 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -109,7 +109,7 @@ func (c *Calcium) doReallocContainerWithMemoryPrior( // 不考虑 memory < 0 对于系统而言,这时候 realloc 只不过使得 node 记录的内存 > 容器拥有内存总和,并不会 OOM if memory > 0 { if err := c.doReallocNodesMemory(ctx, nodeContainersInfo, memory); err != nil { - log.Errorf("[doReallocContainerWithMemoryPrior] realloc memory failed %v", err) + log.Errorf("[doReallocContainerWithMemoryPrior] Realloc memory failed %v", err) for _, containers := range nodeContainersInfo { for _, container := range containers { ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} @@ -140,20 +140,20 @@ func (c *Calcium) doUpdateContainerWithMemoryPrior( cpu float64, memory int64) { for node, containers := range nodeContainersInfo { for _, container := range containers { - newCPU := utils.Round(container.Quota+cpu, 2) + newCPU := utils.Round(container.Quota + cpu) newMemory := container.Memory + memory // 内存不能低于 4MB if newCPU <= 0 || newMemory <= minMemory { - log.Errorf("[doUpdateContainerWithMemoryPrior] new resource invaild %s, %f, %d", container.ID, newCPU, newMemory) + log.Errorf("[doUpdateContainerWithMemoryPrior] New resource invaild %s, %f, %d", container.ID, newCPU, newMemory) ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} continue } - log.Debugf("[doUpdateContainerWithMemoryPrior] container %s: cpu: %f, mem: %d", container.ID, newCPU, newMemory) + log.Debugf("[doUpdateContainerWithMemoryPrior] Container %s: cpu: %f, mem: %d", container.ID, newCPU, newMemory) // CPUQuota not cpu newResource := makeResourceSetting(newCPU, newMemory, nil, container.SoftLimit) updateConfig := enginecontainer.UpdateConfig{Resources: newResource} if _, err := node.Engine.ContainerUpdate(ctx, container.ID, updateConfig); err != nil { - log.Errorf("[doUpdateContainerWithMemoryPrior] update container failed %v, %s", err, container.ID) + log.Errorf("[doUpdateContainerWithMemoryPrior] Update container failed %v, %s", err, container.ID) ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} continue } @@ -167,14 +167,14 @@ func (c *Calcium) doUpdateContainerWithMemoryPrior( container.Quota = newCPU container.Memory = newMemory if err := c.store.UpdateContainer(ctx, container); err != nil { - log.Warnf("[doUpdateContainerWithMemoryPrior] update container %s failed %v", container.ID, err) + log.Warnf("[doUpdateContainerWithMemoryPrior] Update container %s failed %v", container.ID, err) ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} continue } ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true} } if err := c.store.UpdateNode(ctx, node); err != nil { - log.Errorf("[doUpdateContainerWithMemoryPrior] update node %s failed %s", node.Name, err) + log.Errorf("[doUpdateContainerWithMemoryPrior] Update node %s failed %s", node.Name, err) litter.Dump(node) return } @@ -190,10 +190,10 @@ func (c *Calcium) doReallocContainersWithCPUPrior( cpuMemNodeContainersInfo := cpuMemNodeContainers{} for node, containers := range nodeContainersInfo { for _, container := range containers { - newCPU := utils.Round(container.Quota+cpu, 2) + newCPU := utils.Round(container.Quota + cpu) newMem := container.Memory + memory if newCPU < 0 || newMem < minMemory { - log.Errorf("[reallocContainersWithCPUPrior] new resource invaild %s, %f, %d", container.ID, newCPU, newMem) + log.Errorf("[reallocContainersWithCPUPrior] New resource invaild %s, %f, %d", container.ID, newCPU, newMem) ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} continue } @@ -212,7 +212,7 @@ func (c *Calcium) doReallocContainersWithCPUPrior( cpuMemNodesMapInfo, err := c.doReallocNodesCPUMem(ctx, cpuMemNodeContainersInfo) if err != nil { - log.Errorf("[doReallocContainersWithCPUPrior] realloc cpu resource failed %v", err) + log.Errorf("[doReallocContainersWithCPUPrior] Realloc cpu resource failed %v", err) for _, memNodeMap := range cpuMemNodeContainersInfo { for _, nodeInfoMap := range memNodeMap { for _, containers := range nodeInfoMap { @@ -258,7 +258,7 @@ func (c *Calcium) doReallocNodesCPUMem( } // 重新计算需求 - nodesInfo, nodeCPUPlans, total, err := c.scheduler.SelectCPUNodes(nodesInfo, requireCPU, requireMemory) + _, nodeCPUPlans, total, err := c.scheduler.SelectCPUNodes(nodesInfo, requireCPU, requireMemory) if err != nil { return nil, err } @@ -296,7 +296,7 @@ func (c *Calcium) doUpdateContainersWithCPUPrior( resource := makeResourceSetting(newCPU, newMem, cpuPlan, container.SoftLimit) updateConfig := enginecontainer.UpdateConfig{Resources: resource} if _, err := node.Engine.ContainerUpdate(ctx, container.ID, updateConfig); err != nil { - log.Errorf("[doReallocContainersWithCPUPrior] update container failed %v", err) + log.Errorf("[doReallocContainersWithCPUPrior] Update container failed %v", err) ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} continue } @@ -307,14 +307,14 @@ func (c *Calcium) doUpdateContainersWithCPUPrior( container.Quota = newCPU container.Memory = newMem if err := c.store.UpdateContainer(ctx, container); err != nil { - log.Warnf("[doReallocContainersWithCPUPrior] update container %s failed %v", container.ID, err) + log.Warnf("[doReallocContainersWithCPUPrior] Update container %s failed %v", container.ID, err) ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} continue } ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true} } if err := c.store.UpdateNode(ctx, node); err != nil { - log.Errorf("[doReallocContainersWithCPUPrior] update node %s failed %s", node.Name, err) + log.Errorf("[doReallocContainersWithCPUPrior] Update node %s failed %s", node.Name, err) litter.Dump(node) return } diff --git a/cluster/calcium/realloc_test.go b/cluster/calcium/realloc_test.go index c054b9a2d..8130680e1 100644 --- a/cluster/calcium/realloc_test.go +++ b/cluster/calcium/realloc_test.go @@ -12,6 +12,7 @@ import ( enginemocks "github.com/projecteru2/core/3rdmocks" lockmocks "github.com/projecteru2/core/lock/mocks" + schedulermocks "github.com/projecteru2/core/scheduler/mocks" storemocks "github.com/projecteru2/core/store/mocks" "github.com/projecteru2/core/types" "github.com/stretchr/testify/mock" @@ -61,25 +62,24 @@ func TestReallocMem(t *testing.T) { store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) store.On("GetContainer", mock.Anything, mock.Anything).Return(c1, nil) - store.On("GetPod", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() // get pod failed + store.On("GetPod", mock.Anything, mock.Anything).Return(nil, types.ErrNoETCD).Once() ch, err = c.ReallocResource(ctx, []string{"c1"}, 0, 0) assert.NoError(t, err) for c := range ch { assert.False(t, c.Success) } store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil) - - store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrPodNoNodes).Once() // get node failed // pod favor invaild + store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(nil, types.ErrPodNoNodes).Once() ch, err = c.ReallocResource(ctx, []string{"c1"}, 0, 0) assert.NoError(t, err) for c := range ch { assert.False(t, c.Success) } - // node cap not enough store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(node1, nil) + // node cap not enough pod1.Favor = scheduler.MEMORY_PRIOR ch, err = c.ReallocResource(ctx, []string{"c1"}, 0, 2*types.GByte) assert.NoError(t, err) @@ -122,3 +122,108 @@ func TestReallocMem(t *testing.T) { } assert.Equal(t, node1.MemCap-origin, int64(1)) } + +func TestReallocCPU(t *testing.T) { + c := NewTestCluster() + ctx := context.Background() + store := &storemocks.Store{} + c.store = store + + lock := &lockmocks.DistributedLock{} + lock.On("Lock", mock.Anything).Return(nil) + lock.On("Unlock", mock.Anything).Return(nil) + + engine := &enginemocks.APIClient{} + engine.On("ContainerInspect", mock.Anything, mock.Anything).Return(enginetypes.ContainerJSON{}, nil) + + pod1 := &types.Pod{ + Name: "p1", + Favor: "wtf", + } + + node1 := &types.Node{ + Name: "node1", + MemCap: types.GByte, + CPU: types.CPUMap{"0": 10, "1": 70, "2": 10, "3": 100}, + Engine: engine, + Endpoint: "http://1.1.1.1:1", + } + + c1 := &types.Container{ + ID: "c1", + Podname: "p1", + Engine: engine, + Memory: 5 * types.MByte, + HostIP: node1.GetIP(), + Quota: 0.9, + CPU: types.CPUMap{"2": 90}, + } + + store.On("CreateLock", mock.Anything, mock.Anything).Return(lock, nil) + store.On("GetContainer", mock.Anything, mock.Anything).Return(c1, nil) + store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil) + store.On("GetNode", mock.Anything, mock.Anything, mock.Anything).Return(node1, nil) + pod1.Favor = scheduler.CPU_PRIOR + // wrong cpu + ch, err := c.ReallocResource(ctx, []string{"c1"}, -1, 2*types.GByte) + assert.NoError(t, err) + for c := range ch { + assert.False(t, c.Success) + } + simpleMockScheduler := &schedulermocks.Scheduler{} + c.scheduler = simpleMockScheduler + // wrong selectCPUNodes + simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientMEM).Once() + ch, err = c.ReallocResource(ctx, []string{"c1"}, 1, 2*types.GByte) + assert.NoError(t, err) + for c := range ch { + assert.False(t, c.Success) + } + // wrong total + simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, nil).Once() + ch, err = c.ReallocResource(ctx, []string{"c1"}, 1, 2*types.GByte) + assert.NoError(t, err) + for c := range ch { + assert.False(t, c.Success) + } + // good to go + nodeCPUPlans := map[string][]types.CPUMap{ + node1.Name: []types.CPUMap{ + types.CPUMap{ + "3": 100, + }, + types.CPUMap{ + "2": 100, + }, + }, + } + simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil) + // apply resource failed + // update node failed + engine.On("ContainerUpdate", mock.Anything, mock.Anything, mock.Anything).Return(containertypes.ContainerUpdateOKBody{}, types.ErrBadContainerID).Once() + store.On("UpdateNode", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once() + ch, err = c.ReallocResource(ctx, []string{"c1"}, 1, 1) + assert.NoError(t, err) + for c := range ch { + assert.False(t, c.Success) + } + engine.On("ContainerUpdate", mock.Anything, mock.Anything, mock.Anything).Return(containertypes.ContainerUpdateOKBody{}, nil) + store.On("UpdateNode", mock.Anything, mock.Anything).Return(nil) + // update container failed + store.On("UpdateContainer", mock.Anything, mock.Anything).Return(types.ErrNoETCD).Once() + origin := node1.MemCap + ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 1) + assert.NoError(t, err) + for c := range ch { + assert.False(t, c.Success) + } + assert.Equal(t, origin-node1.MemCap, int64(1)) + assert.Equal(t, node1.CPU["3"], 0) + // success + store.On("UpdateContainer", mock.Anything, mock.Anything).Return(nil) + ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 1) + assert.NoError(t, err) + for c := range ch { + assert.True(t, c.Success) + } +} diff --git a/utils/utils.go b/utils/utils.go index 0947d5119..e98e0696b 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -280,9 +280,8 @@ func CreateTarStream(path string) (io.ReadCloser, error) { } // Round for float64 to int -func Round(f float64, n int) float64 { - n10 := math.Pow10(n) - return math.Trunc((f+0.5/n10)*n10) / n10 +func Round(f float64) float64 { + return math.Round(f*100) / 100 } // copied from https://gist.github.com/jmervine/d88c75329f98e09f5c87 diff --git a/utils/utils_test.go b/utils/utils_test.go index b454af6ee..7a9891dcd 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -233,11 +233,11 @@ func TestRound(t *testing.T) { return strconv.FormatFloat(f, 'f', -1, 64) } a := 0.0199999998 - assert.Equal(t, f(Round(a, 2)), "0.02") + assert.Equal(t, f(Round(a)), "0.02") a = 0.1999998 - assert.Equal(t, f(Round(a, 2)), "0.2") + assert.Equal(t, f(Round(a)), "0.2") a = 1.999998 - assert.Equal(t, f(Round(a, 2)), "2") + assert.Equal(t, f(Round(a)), "2") a = 19.99998 - assert.Equal(t, f(Round(a, 2)), "20") + assert.Equal(t, f(Round(a)), "20") }