Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedule volume resource #161

Merged
merged 28 commits into from
Jan 14, 2020
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
42261f3
add volume_map into protobuf
Dec 19, 2019
d2be5aa
node management in rpc layer
Dec 19, 2019
229bf5f
node management in cluster and store
Dec 19, 2019
fb769d4
container operation with volume on protobuf
Dec 19, 2019
70903f9
cluster interface and rpc adapter
Dec 19, 2019
cadae46
calcium can UpdateNodeResource with volume map: create / dissociate /
Dec 23, 2019
72b41bf
implement multiple fragments scheduling algorithm
Dec 26, 2019
02c4ac8
share host struct to distribute resource
Dec 27, 2019
aad7f48
bugfix and pass test
Dec 27, 2019
e44f03d
specify int64 for ResourceMap
Dec 31, 2019
03ac088
introduce VolumePlan type, create volumes using scheduled plans
Jan 2, 2020
945ea06
store.AddNode interface: introduce parameter object
Jan 2, 2020
15c212c
engine handles auto volume
Jan 3, 2020
7b4b8b0
unittest for volume distribution algorithm
Jan 5, 2020
f04a813
createContainerMessage contains volume plan
Jan 6, 2020
d5578ff
refactor: rename, minors
Jan 6, 2020
bfb05d3
realloc volume schedule
Jan 7, 2020
6de5b04
typo, bugfix and minors
Jan 7, 2020
2a4e463
replenish types/node_test
Jan 8, 2020
720d514
keep the same device binding as existing container's(vm's) when realloc
Jan 8, 2020
5ae4244
unittest for realloc
Jan 8, 2020
f918d65
soothe lint
Jan 9, 2020
c4fcb5a
get container with volumes and volume plan
Jan 9, 2020
d9d4e6b
unittest covers realloc more
Jan 9, 2020
ca005ab
realloc hard volume, like /tmp:/tmp
Jan 9, 2020
f361ad5
engine knows whether volume changed by realloc request
Jan 10, 2020
550098a
delete VolumeUsed in NodeInfo: useless
Jan 13, 2020
d5bb274
remedy CompareStringSlice comments
Jan 13, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ cscope.*
*~
.DS_Store
tools/updatev4.go
*.swp
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ GO_LDFLAGS ?= -s -X $(REPO_PATH)/versioninfo.REVISION=$(REVISION) \

grpc:
cd ./rpc/gen/; protoc --go_out=plugins=grpc:. core.proto
cd ./rpc/gen/; python3 -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. core.proto;
jschwinger233 marked this conversation as resolved.
Show resolved Hide resolved

deps:
env GO111MODULE=on go mod download
Expand Down
51 changes: 36 additions & 15 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (c *Calcium) doCreateContainer(ctx context.Context, opts *types.DeployOptio
ch <- m
if m.Error != nil && m.ContainerID == "" {
if err := c.withNodeLocked(ctx, nodeInfo.Name, func(node *types.Node) error {
return c.store.UpdateNodeResource(ctx, node, m.CPU, opts.CPUQuota, opts.Memory, opts.Storage, store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, m.CPU, opts.CPUQuota, opts.Memory, opts.Storage, m.VolumePlan.Merge(), store.ActionIncr)
}); err != nil {
log.Errorf("[doCreateContainer] Reset node %s failed %v", nodeInfo.Name, err)
}
Expand Down Expand Up @@ -100,7 +100,15 @@ func (c *Calcium) doCreateContainerOnNode(ctx context.Context, nodeInfo types.No
if len(nodeInfo.CPUPlan) > 0 {
cpu = nodeInfo.CPUPlan[i]
}
ms[i] = &types.CreateContainerMessage{Error: err, CPU: cpu}
volumePlan := map[string]types.VolumeMap{}
if len(nodeInfo.VolumePlans) > 0 {
volumePlan = nodeInfo.VolumePlans[i]
}
ms[i] = &types.CreateContainerMessage{
Error: err,
CPU: cpu,
VolumePlan: volumePlan,
}
}
return ms
}
Expand All @@ -111,7 +119,11 @@ func (c *Calcium) doCreateContainerOnNode(ctx context.Context, nodeInfo types.No
if len(nodeInfo.CPUPlan) > 0 {
cpu = nodeInfo.CPUPlan[i]
}
ms[i] = c.doCreateAndStartContainer(ctx, i+index, node, opts, cpu)
volumePlan := types.VolumePlan{}
if len(nodeInfo.VolumePlans) > 0 {
volumePlan = nodeInfo.VolumePlans[i]
}
ms[i] = c.doCreateAndStartContainer(ctx, i+index, node, opts, cpu, volumePlan)
if !ms[i].Success {
log.Errorf("[doCreateContainerOnNode] Error when create and start a container, %v", ms[i].Error)
continue
Expand All @@ -136,6 +148,7 @@ func (c *Calcium) doCreateAndStartContainer(
no int, node *types.Node,
opts *types.DeployOptions,
cpu types.CPUMap,
volumePlan types.VolumePlan,
) *types.CreateContainerMessage {
container := &types.Container{
Podname: opts.Podname,
Expand All @@ -152,16 +165,18 @@ func (c *Calcium) doCreateAndStartContainer(
Env: opts.Env,
User: opts.User,
Volumes: opts.Volumes,
VolumePlan: volumePlan,
}
createContainerMessage := &types.CreateContainerMessage{
Podname: container.Podname,
Nodename: container.Nodename,
Success: false,
CPU: cpu,
Quota: opts.CPUQuota,
Memory: opts.Memory,
Storage: opts.Storage,
Publish: map[string][]string{},
Podname: container.Podname,
Nodename: container.Nodename,
Success: false,
CPU: cpu,
Quota: opts.CPUQuota,
Memory: opts.Memory,
Storage: opts.Storage,
VolumePlan: volumePlan,
Publish: map[string][]string{},
}
var err error

Expand All @@ -177,7 +192,7 @@ func (c *Calcium) doCreateAndStartContainer(
}()

// get config
config := c.doMakeContainerOptions(no, cpu, opts, node)
config := c.doMakeContainerOptions(no, cpu, volumePlan, opts, node)
container.Name = config.Name
container.Labels = config.Labels
createContainerMessage.ContainerName = container.Name
Expand Down Expand Up @@ -242,11 +257,11 @@ func (c *Calcium) doCreateAndStartContainer(
return createContainerMessage
}

func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, opts *types.DeployOptions, node *types.Node) *enginetypes.VirtualizationCreateOptions {
func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, volumePlan types.VolumePlan, opts *types.DeployOptions, node *types.Node) *enginetypes.VirtualizationCreateOptions {
config := &enginetypes.VirtualizationCreateOptions{}
// general
config.Seq = index
config.CPU = cpumap.Map()
config.CPU = cpumap
config.Quota = opts.CPUQuota
config.Memory = opts.Memory
config.Storage = opts.Storage
Expand All @@ -259,10 +274,16 @@ func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, opts *t
config.Image = opts.Image
config.Stdin = opts.OpenStdin
config.Hosts = opts.ExtraHosts
config.Volumes = opts.Volumes
config.Volumes = make([]string, len(opts.Volumes))
config.Debug = opts.Debug
config.Network = opts.NetworkMode
config.Networks = opts.Networks

// volumes
for idx, volume := range opts.Volumes {
config.Volumes[idx] = volumePlan.GetVolumeString(volume)
}

// entry
entry := opts.Entrypoint
config.WorkingDir = entry.Dir
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (c *Calcium) DissociateContainer(ctx context.Context, IDs []string) (chan *
return err
}
log.Infof("[DissociateContainer] Container %s dissociated", container.ID)
return c.store.UpdateNodeResource(ctx, node, container.CPU, container.Quota, container.Memory, container.Storage, store.ActionIncr)
return c.store.UpdateNodeResource(ctx, node, container.CPU, container.Quota, container.Memory, container.Storage, container.VolumePlan.Merge(), store.ActionIncr)
})
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/dissociate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestDissociateContainer(t *testing.T) {
}
store.On("RemoveContainer", mock.Anything, mock.Anything).Return(nil)
// success
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
store.On("UpdateNodeResource", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
ch, err = c.DissociateContainer(ctx, []string{"c1"})
assert.NoError(t, err)
for r := range ch {
Expand Down
33 changes: 33 additions & 0 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"io"
"io/ioutil"
"strconv"
"strings"

"bufio"

Expand Down Expand Up @@ -140,12 +142,14 @@ func getNodesInfo(nodes map[string]*types.Node, cpu float64, memory, storage int
nodeInfo := types.NodeInfo{
Name: node.Name,
CPUMap: node.CPU,
VolumeMap: node.Volume,
MemCap: node.MemCap,
StorageCap: node.AvailableStorage(),
CPURate: cpu / float64(len(node.InitCPU)),
MemRate: float64(memory) / float64(node.InitMemCap),
StorageRate: float64(storage) / float64(node.InitStorageCap),
CPUUsed: node.CPUUsed / float64(len(node.InitCPU)),
VolumeUsed: float64(node.VolumeUsed) / float64(len(node.InitVolume)),
jschwinger233 marked this conversation as resolved.
Show resolved Hide resolved
MemUsage: 1.0 - float64(node.MemCap)/float64(node.InitMemCap),
StorageUsage: node.StorageUsage(),
Capacity: 0,
Expand Down Expand Up @@ -232,3 +236,32 @@ func processVirtualizationOutStream(
}()
return outCh
}

func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (autoVolumes []string, hardVolumes []string, err error) {
sizeMap := map[string]int64{} // {"AUTO:/data:rw": 100}
for _, vol := range append(volumes1, volumes2...) {
parts := strings.Split(vol, ":")
if len(parts) != 4 || parts[0] != types.AUTO {
hardVolumes = append(hardVolumes, vol)
continue
}
size, err := strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return nil, nil, err
}
prefix := strings.Join(parts[0:3], ":")
if _, ok := sizeMap[prefix]; ok {
sizeMap[prefix] += size
} else {
sizeMap[prefix] = size
}
}

for prefix, size := range sizeMap {
if size < 0 {
continue
}
autoVolumes = append(autoVolumes, fmt.Sprintf("%s:%d", prefix, size))
}
return autoVolumes, hardVolumes, nil
}
21 changes: 20 additions & 1 deletion cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (
)

// AddNode add a node in pod
/*
func (c *Calcium) AddNode(ctx context.Context, nodename, endpoint, podname, ca, cert, key string,
cpu, share int, memory, storage int64, labels map[string]string,
numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error) {
return c.store.AddNode(ctx, nodename, endpoint, podname, ca, cert, key, cpu, share, memory, storage, labels, numa, numaMemory)
*/
func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*types.Node, error) {
return c.store.AddNode(ctx, opts)
}

// RemoveNode remove a node
Expand Down Expand Up @@ -126,6 +129,22 @@ func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*typ
}
}
}
// update volume
for volumeDir, changeCap := range opts.DeltaVolume {
if _, ok := n.Volume[volumeDir]; !ok && changeCap > 0 {
n.Volume[volumeDir] = changeCap
n.InitVolume[volumeDir] = changeCap
} else if ok && changeCap == 0 {
delete(n.Volume, volumeDir)
delete(n.InitVolume, volumeDir)
} else if ok {
n.Volume[volumeDir] += changeCap
n.InitVolume[volumeDir] += changeCap
if n.Volume[volumeDir] < 0 {
return types.ErrBadVolume
}
}
}
return c.store.UpdateNode(ctx, n)
})
}
26 changes: 21 additions & 5 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package calcium

import (
"context"
"errors"
"testing"

lockmocks "github.com/projecteru2/core/lock/mocks"
Expand All @@ -28,7 +29,7 @@ func TestAddNode(t *testing.T) {
mock.Anything, mock.Anything, mock.Anything).Return(node, nil)
c.store = store

n, err := c.AddNode(ctx, "", "", "", "", "", "", 0, 0, int64(0), int64(0), nil, nil, nil)
n, err := c.AddNode(ctx, &types.AddNodeOptions{})
assert.NoError(t, err)
assert.Equal(t, n.Name, name)
}
Expand Down Expand Up @@ -105,6 +106,7 @@ func TestSetNode(t *testing.T) {
ctx := context.Background()
name := "test"
node := &types.Node{Name: name}
node.Init()

store := &storemocks.Store{}
c.store = store
Expand Down Expand Up @@ -210,10 +212,24 @@ func TestSetNode(t *testing.T) {
assert.NoError(t, err)
_, ok := n.CPU["1"]
assert.False(t, ok)
assert.Equal(t, n.CPU["2"], 1)
assert.Equal(t, n.InitCPU["2"], 9)
assert.Equal(t, n.CPU["3"], 10)
assert.Equal(t, n.InitCPU["3"], 10)
assert.Equal(t, n.CPU["2"], int64(1))
assert.Equal(t, n.InitCPU["2"], int64(9))
assert.Equal(t, n.CPU["3"], int64(10))
assert.Equal(t, n.InitCPU["3"], int64(10))
assert.Equal(t, len(n.CPU), 2)
assert.Equal(t, len(n.InitCPU), 2)
// succ set volume
n.Volume = types.VolumeMap{"/sda1": 10, "/sda2": 20}
setOpts.DeltaCPU = nil
setOpts.DeltaVolume = types.VolumeMap{"/sda0": 5, "/sda1": 0, "/sda2": -1}
n, err = c.SetNode(ctx, setOpts)
assert.NoError(t, err)
_, ok = n.Volume["/sda1"]
assert.False(t, ok)
assert.Equal(t, n.Volume["/sda0"], int64(5))
assert.Equal(t, n.Volume["/sda2"], int64(19))
// failed by nagative volume size
setOpts.DeltaVolume = types.VolumeMap{"/sda0": -100}
n, err = c.SetNode(ctx, setOpts)
assert.True(t, errors.Is(err, types.ErrBadVolume))
}
Loading