Skip to content

Commit

Permalink
schedule volume resource (#161)
Browse files Browse the repository at this point in the history
* add volume_map into protobuf

* node management in rpc layer

* node management in cluster and store

* container operation with volume on protobuf

* cluster interface and rpc adapter

* calcium can UpdateNodeResource with volume map: create / dissociate /
remove

* implement multiple fragments scheduling algorithm

* share host struct to distribute resource

* bugfix and pass test

* specify int64 for ResourceMap

* introduce VolumePlan type, create volumes using scheduled plans

* store.AddNode interface: introduce parameter object

* engine handles auto volume

* unittest for volume distribution algorithm

* createContainerMessage contains volume plan

* refactor: rename, minors

* realloc volume schedule

* typo, bugfix and minors

* replenish types/node_test

* keep the same device binding as existing container's(vm's) when realloc

* unittest for realloc

* soothe lint

* get container with volumes and volume plan

* unittest covers realloc more

* realloc hard volume, like /tmp:/tmp

* engine knows whether volume changed by realloc request

* delete VolumeUsed in NodeInfo: useless

* remedy CompareStringSlice comments
  • Loading branch information
zc authored and CMGS committed Jan 14, 2020
1 parent 64329db commit 677104b
Show file tree
Hide file tree
Showing 50 changed files with 1,875 additions and 796 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ cscope.*
*~
.DS_Store
tools/updatev4.go
*.swp
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ GO_LDFLAGS ?= -s -X $(REPO_PATH)/versioninfo.REVISION=$(REVISION) \

grpc:
cd ./rpc/gen/; protoc --go_out=plugins=grpc:. core.proto
cd ./rpc/gen/; python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. core.proto;

deps:
env GO111MODULE=on go mod download
Expand Down
51 changes: 36 additions & 15 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *Calcium) doCreateContainer(ctx context.Context, opts *types.DeployOptio
ch <- m
if m.Error != nil && m.ContainerID == "" {
if err := c.withNodeLocked(ctx, nodeInfo.Name, func(node *types.Node) error {
return c.store.UpdateNodeResource(ctx, node, m.CPU, opts.CPUQuota, opts.Memory, opts.Storage, store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, m.CPU, opts.CPUQuota, opts.Memory, opts.Storage, m.VolumePlan.Merge(), store.ActionIncr)
}); err != nil {
log.Errorf("[doCreateContainer] Reset node %s failed %v", nodeInfo.Name, err)
}
Expand Down Expand Up @@ -100,7 +100,15 @@ func (c *Calcium) doCreateContainerOnNode(ctx context.Context, nodeInfo types.No
if len(nodeInfo.CPUPlan) > 0 {
cpu = nodeInfo.CPUPlan[i]
}
ms[i] = &types.CreateContainerMessage{Error: err, CPU: cpu}
volumePlan := map[string]types.VolumeMap{}
if len(nodeInfo.VolumePlans) > 0 {
volumePlan = nodeInfo.VolumePlans[i]
}
ms[i] = &types.CreateContainerMessage{
Error: err,
CPU: cpu,
VolumePlan: volumePlan,
}
}
return ms
}
Expand All @@ -111,7 +119,11 @@ func (c *Calcium) doCreateContainerOnNode(ctx context.Context, nodeInfo types.No
if len(nodeInfo.CPUPlan) > 0 {
cpu = nodeInfo.CPUPlan[i]
}
ms[i] = c.doCreateAndStartContainer(ctx, i+index, node, opts, cpu)
volumePlan := types.VolumePlan{}
if len(nodeInfo.VolumePlans) > 0 {
volumePlan = nodeInfo.VolumePlans[i]
}
ms[i] = c.doCreateAndStartContainer(ctx, i+index, node, opts, cpu, volumePlan)
if !ms[i].Success {
log.Errorf("[doCreateContainerOnNode] Error when create and start a container, %v", ms[i].Error)
continue
Expand All @@ -136,6 +148,7 @@ func (c *Calcium) doCreateAndStartContainer(
no int, node *types.Node,
opts *types.DeployOptions,
cpu types.CPUMap,
volumePlan types.VolumePlan,
) *types.CreateContainerMessage {
container := &types.Container{
Podname: opts.Podname,
Expand All @@ -152,16 +165,18 @@ func (c *Calcium) doCreateAndStartContainer(
Env: opts.Env,
User: opts.User,
Volumes: opts.Volumes,
VolumePlan: volumePlan,
}
createContainerMessage := &types.CreateContainerMessage{
Podname: container.Podname,
Nodename: container.Nodename,
Success: false,
CPU: cpu,
Quota: opts.CPUQuota,
Memory: opts.Memory,
Storage: opts.Storage,
Publish: map[string][]string{},
Podname: container.Podname,
Nodename: container.Nodename,
Success: false,
CPU: cpu,
Quota: opts.CPUQuota,
Memory: opts.Memory,
Storage: opts.Storage,
VolumePlan: volumePlan,
Publish: map[string][]string{},
}
var err error

Expand All @@ -177,7 +192,7 @@ func (c *Calcium) doCreateAndStartContainer(
}()

// get config
config := c.doMakeContainerOptions(no, cpu, opts, node)
config := c.doMakeContainerOptions(no, cpu, volumePlan, opts, node)
container.Name = config.Name
container.Labels = config.Labels
createContainerMessage.ContainerName = container.Name
Expand Down Expand Up @@ -242,11 +257,11 @@ func (c *Calcium) doCreateAndStartContainer(
return createContainerMessage
}

func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, opts *types.DeployOptions, node *types.Node) *enginetypes.VirtualizationCreateOptions {
func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, volumePlan types.VolumePlan, opts *types.DeployOptions, node *types.Node) *enginetypes.VirtualizationCreateOptions {
config := &enginetypes.VirtualizationCreateOptions{}
// general
config.Seq = index
config.CPU = cpumap.Map()
config.CPU = cpumap
config.Quota = opts.CPUQuota
config.Memory = opts.Memory
config.Storage = opts.Storage
Expand All @@ -259,10 +274,16 @@ func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, opts *t
config.Image = opts.Image
config.Stdin = opts.OpenStdin
config.Hosts = opts.ExtraHosts
config.Volumes = opts.Volumes
config.Volumes = make([]string, len(opts.Volumes))
config.Debug = opts.Debug
config.Network = opts.NetworkMode
config.Networks = opts.Networks

// volumes
for idx, volume := range opts.Volumes {
config.Volumes[idx] = volumePlan.GetVolumeString(volume)
}

// entry
entry := opts.Entrypoint
config.WorkingDir = entry.Dir
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (c *Calcium) DissociateContainer(ctx context.Context, IDs []string) (chan *
return err
}
log.Infof("[DissociateContainer] Container %s dissociated", container.ID)
return c.store.UpdateNodeResource(ctx, node, container.CPU, container.Quota, container.Memory, container.Storage, store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, container.CPU, container.Quota, container.Memory, container.Storage, container.VolumePlan.Merge(), store.ActionIncr)
})
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestDissociateContainer(t *testing.T) {
}
store.On("RemoveContainer", mock.Anything, mock.Anything).Return(nil)
// success
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
ch, err = c.DissociateContainer(ctx, []string{"c1"})
assert.NoError(t, err)
for r := range ch {
Expand Down
32 changes: 32 additions & 0 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"

"bufio"

Expand Down Expand Up @@ -140,6 +142,7 @@ func getNodesInfo(nodes map[string]*types.Node, cpu float64, memory, storage int
nodeInfo := types.NodeInfo{
Name: node.Name,
CPUMap: node.CPU,
VolumeMap: node.Volume,
MemCap: node.MemCap,
StorageCap: node.AvailableStorage(),
CPURate: cpu / float64(len(node.InitCPU)),
Expand Down Expand Up @@ -232,3 +235,32 @@ func processVirtualizationOutStream(
}()
return outCh
}

func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (autoVolumes []string, hardVolumes []string, err error) {
sizeMap := map[string]int64{} // {"AUTO:/data:rw": 100}
for _, vol := range append(volumes1, volumes2...) {
parts := strings.Split(vol, ":")
if len(parts) != 4 || parts[0] != types.AUTO {
hardVolumes = append(hardVolumes, vol)
continue
}
size, err := strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return nil, nil, err
}
prefix := strings.Join(parts[0:3], ":")
if _, ok := sizeMap[prefix]; ok {
sizeMap[prefix] += size
} else {
sizeMap[prefix] = size
}
}

for prefix, size := range sizeMap {
if size < 0 {
continue
}
autoVolumes = append(autoVolumes, fmt.Sprintf("%s:%d", prefix, size))
}
return autoVolumes, hardVolumes, nil
}
21 changes: 20 additions & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
)

// AddNode add a node in pod
/*
func (c *Calcium) AddNode(ctx context.Context, nodename, endpoint, podname, ca, cert, key string,
cpu, share int, memory, storage int64, labels map[string]string,
numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error) {
return c.store.AddNode(ctx, nodename, endpoint, podname, ca, cert, key, cpu, share, memory, storage, labels, numa, numaMemory)
*/
func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*types.Node, error) {
return c.store.AddNode(ctx, opts)
}

// RemoveNode remove a node
Expand Down Expand Up @@ -126,6 +129,22 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}
}
}
// update volume
for volumeDir, changeCap := range opts.DeltaVolume {
if _, ok := n.Volume[volumeDir]; !ok && changeCap > 0 {
n.Volume[volumeDir] = changeCap
n.InitVolume[volumeDir] = changeCap
} else if ok && changeCap == 0 {
delete(n.Volume, volumeDir)
delete(n.InitVolume, volumeDir)
} else if ok {
n.Volume[volumeDir] += changeCap
n.InitVolume[volumeDir] += changeCap
if n.Volume[volumeDir] < 0 {
return types.ErrBadVolume
}
}
}
return c.store.UpdateNode(ctx, n)
})
}
26 changes: 21 additions & 5 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package calcium

import (
"context"
"errors"
"testing"

lockmocks "github.com/projecteru2/core/lock/mocks"
Expand All @@ -28,7 +29,7 @@ func TestAddNode(t *testing.T) {
mock.Anything, mock.Anything, mock.Anything).Return(node, nil)
c.store = store

n, err := c.AddNode(ctx, "", "", "", "", "", "", 0, 0, int64(0), int64(0), nil, nil, nil)
n, err := c.AddNode(ctx, &types.AddNodeOptions{})
assert.NoError(t, err)
assert.Equal(t, n.Name, name)
}
Expand Down Expand Up @@ -105,6 +106,7 @@ func TestSetNode(t *testing.T) {
ctx := context.Background()
name := "test"
node := &types.Node{Name: name}
node.Init()

store := &storemocks.Store{}
c.store = store
Expand Down Expand Up @@ -210,10 +212,24 @@ func TestSetNode(t *testing.T) {
assert.NoError(t, err)
_, ok := n.CPU["1"]
assert.False(t, ok)
assert.Equal(t, n.CPU["2"], 1)
assert.Equal(t, n.InitCPU["2"], 9)
assert.Equal(t, n.CPU["3"], 10)
assert.Equal(t, n.InitCPU["3"], 10)
assert.Equal(t, n.CPU["2"], int64(1))
assert.Equal(t, n.InitCPU["2"], int64(9))
assert.Equal(t, n.CPU["3"], int64(10))
assert.Equal(t, n.InitCPU["3"], int64(10))
assert.Equal(t, len(n.CPU), 2)
assert.Equal(t, len(n.InitCPU), 2)
// succ set volume
n.Volume = types.VolumeMap{"/sda1": 10, "/sda2": 20}
setOpts.DeltaCPU = nil
setOpts.DeltaVolume = types.VolumeMap{"/sda0": 5, "/sda1": 0, "/sda2": -1}
n, err = c.SetNode(ctx, setOpts)
assert.NoError(t, err)
_, ok = n.Volume["/sda1"]
assert.False(t, ok)
assert.Equal(t, n.Volume["/sda0"], int64(5))
assert.Equal(t, n.Volume["/sda2"], int64(19))
// failed by nagative volume size
setOpts.DeltaVolume = types.VolumeMap{"/sda0": -100}
n, err = c.SetNode(ctx, setOpts)
assert.True(t, errors.Is(err, types.ErrBadVolume))
}
Loading

0 comments on commit 677104b

Please sign in to comment.