Skip to content

Commit

Permalink
monopoly volume schedule (#164)
Browse files Browse the repository at this point in the history
* refactor: replace primitive with objects: []string -> VolumeBindings

* rescue unittest for refactor

* add InitVolume to NodeInfo

* monopoly volume schedule

* move volume binding types

* fix matching volume plans with volume bindings

* unittest schedule monopoly volume

* unittest for types module

* normalize volume strings

* soothe lint
  • Loading branch information
zc authored Jan 31, 2020
1 parent 6f1fe56 commit 1931489
Show file tree
Hide file tree
Showing 29 changed files with 842 additions and 323 deletions.
11 changes: 3 additions & 8 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *Calcium) doCreateContainer(ctx context.Context, opts *types.DeployOptio
ch <- m
if m.Error != nil && m.ContainerID == "" {
if err := c.withNodeLocked(ctx, nodeInfo.Name, func(node *types.Node) error {
return c.store.UpdateNodeResource(ctx, node, m.CPU, opts.CPUQuota, opts.Memory, opts.Storage, m.VolumePlan.Merge(), store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, m.CPU, opts.CPUQuota, opts.Memory, opts.Storage, m.VolumePlan.IntoVolumeMap(), store.ActionIncr)
}); err != nil {
log.Errorf("[doCreateContainer] Reset node %s failed %v", nodeInfo.Name, err)
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func (c *Calcium) doCreateContainerOnNode(ctx context.Context, nodeInfo types.No
if len(nodeInfo.CPUPlan) > 0 {
cpu = nodeInfo.CPUPlan[i]
}
volumePlan := map[string]types.VolumeMap{}
volumePlan := types.VolumePlan{}
if len(nodeInfo.VolumePlans) > 0 {
volumePlan = nodeInfo.VolumePlans[i]
}
Expand Down Expand Up @@ -274,16 +274,11 @@ func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, volumeP
config.Image = opts.Image
config.Stdin = opts.OpenStdin
config.Hosts = opts.ExtraHosts
config.Volumes = make([]string, len(opts.Volumes))
config.Volumes = opts.Volumes.ApplyPlan(volumePlan).ToStringSlice(false, true)
config.Debug = opts.Debug
config.Network = opts.NetworkMode
config.Networks = opts.Networks

// volumes
for idx, volume := range opts.Volumes {
config.Volumes[idx] = volumePlan.GetVolumeString(volume)
}

// entry
entry := opts.Entrypoint
config.WorkingDir = entry.Dir
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (c *Calcium) DissociateContainer(ctx context.Context, IDs []string) (chan *
return err
}
log.Infof("[DissociateContainer] Container %s dissociated", container.ID)
return c.store.UpdateNodeResource(ctx, node, container.CPU, container.Quota, container.Memory, container.Storage, container.VolumePlan.Merge(), store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, container.CPU, container.Quota, container.Memory, container.Storage, container.VolumePlan.IntoVolumeMap(), store.ActionIncr)
})
})
if err != nil {
Expand Down
60 changes: 15 additions & 45 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"

"bufio"

Expand Down Expand Up @@ -140,20 +138,21 @@ func getNodesInfo(nodes map[string]*types.Node, cpu float64, memory, storage int
result := []types.NodeInfo{}
for _, node := range nodes {
nodeInfo := types.NodeInfo{
Name: node.Name,
CPUMap: node.CPU,
VolumeMap: node.Volume,
MemCap: node.MemCap,
StorageCap: node.AvailableStorage(),
CPURate: cpu / float64(len(node.InitCPU)),
MemRate: float64(memory) / float64(node.InitMemCap),
StorageRate: float64(storage) / float64(node.InitStorageCap),
CPUUsed: node.CPUUsed / float64(len(node.InitCPU)),
MemUsage: 1.0 - float64(node.MemCap)/float64(node.InitMemCap),
StorageUsage: node.StorageUsage(),
Capacity: 0,
Count: 0,
Deploy: 0,
Name: node.Name,
CPUMap: node.CPU,
VolumeMap: node.Volume,
InitVolumeMap: node.InitVolume,
MemCap: node.MemCap,
StorageCap: node.AvailableStorage(),
CPURate: cpu / float64(len(node.InitCPU)),
MemRate: float64(memory) / float64(node.InitMemCap),
StorageRate: float64(storage) / float64(node.InitStorageCap),
CPUUsed: node.CPUUsed / float64(len(node.InitCPU)),
MemUsage: 1.0 - float64(node.MemCap)/float64(node.InitMemCap),
StorageUsage: node.StorageUsage(),
Capacity: 0,
Count: 0,
Deploy: 0,
}
result = append(result, nodeInfo)
}
Expand Down Expand Up @@ -235,32 +234,3 @@ func processVirtualizationOutStream(
}()
return outCh
}

func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (autoVolumes []string, hardVolumes []string, err error) {
sizeMap := map[string]int64{} // {"AUTO:/data:rw": 100}
for _, vol := range append(volumes1, volumes2...) {
parts := strings.Split(vol, ":")
if len(parts) != 4 || parts[0] != types.AUTO {
hardVolumes = append(hardVolumes, vol)
continue
}
size, err := strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return nil, nil, err
}
prefix := strings.Join(parts[0:3], ":")
if _, ok := sizeMap[prefix]; ok {
sizeMap[prefix] += size
} else {
sizeMap[prefix] = size
}
}

for prefix, size := range sizeMap {
if size < 0 {
continue
}
autoVolumes = append(autoVolumes, fmt.Sprintf("%s:%d", prefix, size))
}
return autoVolumes, hardVolumes, nil
}
65 changes: 33 additions & 32 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package calcium

import (
"context"
"sort"
"strings"
"sync"

Expand All @@ -21,7 +20,7 @@ type nodeContainers map[string][]*types.Container
type volCPUMemNodeContainers map[string]map[float64]map[int64]nodeContainers

// ReallocResource allow realloc container resource
func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64, volumes []string) (chan *types.ReallocResourceMessage, error) {
func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64, memory int64, volumes types.VolumeBindings) (chan *types.ReallocResourceMessage, error) {
ch := make(chan *types.ReallocResourceMessage)
go func() {
defer close(ch)
Expand Down Expand Up @@ -74,10 +73,10 @@ func (c *Calcium) doReallocContainer(
ch chan *types.ReallocResourceMessage,
pod *types.Pod,
nodeContainersInfo nodeContainers,
cpu float64, memory int64, volumes []string) {
cpu float64, memory int64, volumes types.VolumeBindings) {

volCPUMemNodeContainersInfo := volCPUMemNodeContainers{}
hardVolumesForContainer := map[string][]string{}
hardVbsForContainer := map[string]types.VolumeBindings{}
for nodename, containers := range nodeContainersInfo {
for _, container := range containers {
newCPU := utils.Round(container.Quota + cpu)
Expand All @@ -87,33 +86,33 @@ func (c *Calcium) doReallocContainer(
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
autoVolumes, hardVolumes, err := mergeAutoVolumeRequests(container.Volumes, volumes)
hardVolumesForContainer[container.ID] = hardVolumes

autoVolumes, hardVolumes, err := container.Volumes.Merge(volumes)
hardVbsForContainer[container.ID] = hardVolumes
if err != nil {
log.Errorf("[doReallocContainer] New resource invalid %s, vol %v, err %v", container.ID, volumes, err)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
sort.Slice(autoVolumes, func(i, j int) bool { return autoVolumes[i] < autoVolumes[j] })
newVol := strings.Join(autoVolumes, ",")
newAutoVol := strings.Join(autoVolumes.ToStringSlice(true, false), ",")

if _, ok := volCPUMemNodeContainersInfo[newVol]; !ok {
volCPUMemNodeContainersInfo[newVol] = map[float64]map[int64]nodeContainers{}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol]; !ok {
volCPUMemNodeContainersInfo[newAutoVol] = map[float64]map[int64]nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newVol][newCPU]; !ok {
volCPUMemNodeContainersInfo[newVol][newCPU] = map[int64]nodeContainers{}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU] = map[int64]nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newVol][newCPU][newMem]; !ok {
volCPUMemNodeContainersInfo[newVol][newCPU][newMem] = nodeContainers{}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem] = nodeContainers{}
}
if _, ok := volCPUMemNodeContainersInfo[newVol][newCPU][newMem][nodename]; !ok {
volCPUMemNodeContainersInfo[newVol][newCPU][newMem][nodename] = []*types.Container{}
if _, ok := volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename]; !ok {
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename] = []*types.Container{}
}
volCPUMemNodeContainersInfo[newVol][newCPU][newMem][nodename] = append(volCPUMemNodeContainersInfo[newVol][newCPU][newMem][nodename], container)
volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename] = append(volCPUMemNodeContainersInfo[newAutoVol][newCPU][newMem][nodename], container)
}
}

for newVol, cpuMemNodeContainersInfo := range volCPUMemNodeContainersInfo {
for newAutoVol, cpuMemNodeContainersInfo := range volCPUMemNodeContainersInfo {
for newCPU, memNodesContainers := range cpuMemNodeContainersInfo {
for newMemory, nodesContainers := range memNodesContainers {
for nodename, containers := range nodesContainers {
Expand All @@ -125,8 +124,8 @@ func (c *Calcium) doReallocContainer(
// 不更新 etcd,内存计算
node.CPU.Add(container.CPU)
node.SetCPUUsed(container.Quota, types.DecrUsage)
node.Volume.Add(container.VolumePlan.Merge())
node.SetVolumeUsed(container.VolumePlan.Merge().Total(), types.DecrUsage)
node.Volume.Add(container.VolumePlan.IntoVolumeMap())
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.DecrUsage)
node.MemCap += container.Memory
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
node.IncrNUMANodeMemory(nodeID, container.Memory)
Expand Down Expand Up @@ -157,10 +156,11 @@ func (c *Calcium) doReallocContainer(
}

var volumePlans []types.VolumePlan
// newVol won't be empty as long as existing container bound volumes before or realloc request requires new binding
if newVol != "" {
nodesInfo := []types.NodeInfo{{Name: node.Name, VolumeMap: node.Volume}}
_, nodeVolumePlans, total, err := c.scheduler.SelectVolumeNodes(nodesInfo, strings.Split(newVol, ","))
autoVbs := types.VolumeBindings{}
if newAutoVol != "" {
nodesInfo := []types.NodeInfo{{Name: node.Name, VolumeMap: node.Volume, InitVolumeMap: node.InitVolume}}
autoVbs, _ = types.MakeVolumeBindings(strings.Split(newAutoVol, ","))
_, nodeVolumePlans, total, err := c.scheduler.SelectVolumeNodes(nodesInfo, autoVbs)
if err != nil {
return err
}
Expand Down Expand Up @@ -189,20 +189,21 @@ func (c *Calcium) doReallocContainer(
Quota: newCPU,
Memory: newMemory,
SoftLimit: container.SoftLimit,
Volumes: hardVolumesForContainer[container.ID],
Volumes: hardVbsForContainer[container.ID].ToStringSlice(false, false),
}
if len(container.CPU) > 0 {
newResource.CPU = cpusets[0]
newResource.NUMANode = node.GetNUMANode(cpusets[0])
cpusets = cpusets[1:]
}

if newVol != "" {
if newAutoVol != "" {
newResource.VolumePlan = volumePlans[idx].ToLiteral()
newResource.Volumes = append(newResource.Volumes, strings.Split(newVol, ",")...)
newResource.Volumes = append(newResource.Volumes, autoVbs.ToStringSlice(false, false)...)
}

if utils.CompareStringSlice(newResource.Volumes, container.Volumes) != 0 {
newVbs, _ := types.MakeVolumeBindings(newResource.Volumes)
if !newVbs.IsEqual(container.Volumes) {
newResource.VolumeChanged = true
}

Expand All @@ -212,8 +213,8 @@ func (c *Calcium) doReallocContainer(
container.CPU = newResource.CPU
container.Quota = newResource.Quota
container.Memory = newResource.Memory
container.Volumes = newResource.Volumes
container.VolumePlan = types.ToVolumePlan(newResource.VolumePlan)
container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes)
container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan)
updateSuccess = true
} else {
log.Errorf("[doReallocContainer] Realloc container %s failed %v", container.ID, err)
Expand All @@ -223,8 +224,8 @@ func (c *Calcium) doReallocContainer(
// 失败的话,node 占用为老资源
node.CPU.Sub(container.CPU)
node.SetCPUUsed(container.Quota, types.IncrUsage)
node.Volume.Sub(container.VolumePlan.Merge())
node.SetVolumeUsed(container.VolumePlan.Merge().Total(), types.IncrUsage)
node.Volume.Sub(container.VolumePlan.IntoVolumeMap())
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage)
node.MemCap -= container.Memory
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
node.DecrNUMANodeMemory(nodeID, container.Memory)
Expand Down
50 changes: 22 additions & 28 deletions cluster/calcium/realloc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func TestRealloc(t *testing.T) {
Quota: 0.9,
CPU: types.CPUMap{"2": 90},
Nodename: "node1",
VolumePlan: types.VolumePlan{"AUTO:/data:rw:50": types.VolumeMap{"/dir0": 50}},
Volumes: []string{"AUTO:/data:rw:50"},
VolumePlan: types.VolumePlan{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}},
Volumes: types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}),
}

c2 := &types.Container{
Expand Down Expand Up @@ -84,12 +84,6 @@ func TestRealloc(t *testing.T) {
assert.False(t, r.Success)
}
store.On("GetPod", mock.Anything, mock.Anything).Return(pod1, nil)
// failed by invalid volume realloc request
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.GiB), []string{"AUTO:/data:rw:1g"})
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
}
// failed by newCPU < 0
ch, err = c.ReallocResource(ctx, []string{"c1"}, -1, 2*int64(units.GiB), nil)
assert.NoError(t, err)
Expand All @@ -115,9 +109,9 @@ func TestRealloc(t *testing.T) {
c.scheduler = simpleMockScheduler
simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientMEM).Once()
nodeVolumePlans := map[string][]types.VolumePlan{
"c1": {{"AUTO:/data:rw:50": types.VolumeMap{"/dir0": 50}}},
"c1": {{types.MustToVolumeBinding("AUTO:/data:rw:50"): types.VolumeMap{"/dir0": 50}}},
}
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, []string{"AUTO:/data:rw:50"}).Return(nil, nodeVolumePlans, 1, nil)
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"})).Return(nil, nodeVolumePlans, 1, nil)
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, 2*int64(units.MiB), nil)
assert.NoError(t, err)
for r := range ch {
Expand Down Expand Up @@ -173,28 +167,28 @@ func TestRealloc(t *testing.T) {
// failed by volume binding incompatible
nodeVolumePlans = map[string][]types.VolumePlan{
node1.Name: {
{"AUTO:/data:rw:100": types.VolumeMap{"/dir1": 100}},
{"AUTO:/data:rw:100": types.VolumeMap{"/dir2": 100}},
{"AUTO:/data:rw:100": types.VolumeMap{"/dir3": 100}},
{"AUTO:/data:rw:100": types.VolumeMap{"/dir4": 100}},
{types.MustToVolumeBinding("AUTO:/data:rw:100"): types.VolumeMap{"/dir1": 100}},
{types.MustToVolumeBinding("AUTO:/data:rw:100"): types.VolumeMap{"/dir2": 100}},
{types.MustToVolumeBinding("AUTO:/data:rw:100"): types.VolumeMap{"/dir3": 100}},
{types.MustToVolumeBinding("AUTO:/data:rw:100"): types.VolumeMap{"/dir4": 100}},
},
}
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, []string{"AUTO:/data:rw:100"}).Return(nil, nodeVolumePlans, 4, nil).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), []string{"AUTO:/data:rw:50"})
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data:rw:100"})).Return(nil, nodeVolumePlans, 4, nil).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:50"}))
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
}
// failed by volume schedule error
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nil, 0, types.ErrInsufficientVolume).Once()
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), []string{"AUTO:/data:rw:1"})
ch, err = c.ReallocResource(ctx, []string{"c1"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}))
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
}
// failed due to re-volume plan less then container number
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, mock.Anything).Return(nil, nodeVolumePlans, 0, nil).Twice()
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, int64(units.MiB), []string{"AUTO:/data:rw:1"})
ch, err = c.ReallocResource(ctx, []string{"c1", "c2"}, 0.1, int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data:rw:1"}))
assert.NoError(t, err)
for r := range ch {
assert.False(t, r.Success)
Expand All @@ -219,10 +213,10 @@ func TestRealloc(t *testing.T) {
Memory: 5 * int64(units.MiB),
Quota: 0.9,
CPU: types.CPUMap{"2": 90},
Volumes: []string{"AUTO:/data0:rw:100", "AUTO:/data1:rw:200"},
Volumes: types.MustToVolumeBindings([]string{"AUTO:/data0:rw:100", "AUTO:/data1:rw:200"}),
VolumePlan: types.VolumePlan{
"AUTO:/data0:rw:100": types.VolumeMap{"/dir0": 100},
"AUTO:/data1:rw:200": types.VolumeMap{"/dir1": 200},
types.MustToVolumeBinding("AUTO:/data0:rw:100"): types.VolumeMap{"/dir0": 100},
types.MustToVolumeBinding("AUTO:/data1:rw:200"): types.VolumeMap{"/dir1": 200},
},
Nodename: "node2",
}
Expand All @@ -232,7 +226,7 @@ func TestRealloc(t *testing.T) {
Engine: engine,
Memory: 5 * int64(units.MiB),
Quota: 0.9,
Volumes: []string{"/tmp:/tmp", "/var/log:/var/log:rw:300"},
Volumes: types.MustToVolumeBindings([]string{"/tmp:/tmp", "/var/log:/var/log:rw:300"}),
Nodename: "node2",
}
nodeCPUPlans = map[string][]types.CPUMap{
Expand All @@ -244,20 +238,20 @@ func TestRealloc(t *testing.T) {
nodeVolumePlans = map[string][]types.VolumePlan{
node2.Name: {
{
"AUTO:/data0:rw:50": types.VolumeMap{"/dir1": 50},
"AUTO:/data1:rw:200": types.VolumeMap{"/dir2": 200},
types.MustToVolumeBinding("AUTO:/data0:rw:50"): types.VolumeMap{"/dir1": 50},
types.MustToVolumeBinding("AUTO:/data1:rw:200"): types.VolumeMap{"/dir2": 200},
},
{
"AUTO:/data0:rw:50": types.VolumeMap{"/dir0": 50},
"AUTO:/data1:rw:200": types.VolumeMap{"/dir1": 200},
types.MustToVolumeBinding("AUTO:/data0:rw:50"): types.VolumeMap{"/dir0": 50},
types.MustToVolumeBinding("AUTO:/data1:rw:200"): types.VolumeMap{"/dir1": 200},
},
},
}
simpleMockScheduler.On("SelectCPUNodes", mock.Anything, mock.Anything, mock.Anything).Return(nil, nodeCPUPlans, 2, nil)
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, []string{"AUTO:/data0:rw:50", "AUTO:/data1:rw:200"}).Return(nil, nodeVolumePlans, 2, nil)
simpleMockScheduler.On("SelectVolumeNodes", mock.Anything, types.MustToVolumeBindings([]string{"AUTO:/data0:rw:50", "AUTO:/data1:rw:200"})).Return(nil, nodeVolumePlans, 2, nil)
store.On("GetNode", mock.Anything, "node2").Return(node2, nil)
store.On("GetContainers", mock.Anything, []string{"c3", "c4"}).Return([]*types.Container{c3, c4}, nil)
ch, err = c.ReallocResource(ctx, []string{"c3", "c4"}, 0.1, 2*int64(units.MiB), []string{"AUTO:/data0:rw:-50"})
ch, err = c.ReallocResource(ctx, []string{"c3", "c4"}, 0.1, 2*int64(units.MiB), types.MustToVolumeBindings([]string{"AUTO:/data0:rw:-50"}))
assert.NoError(t, err)
for r := range ch {
assert.True(t, r.Success)
Expand Down
Loading

0 comments on commit 1931489

Please sign in to comment.