Skip to content

Commit

Permalink
realloc hard volume, like /tmp:/tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
zc committed Jan 9, 2020
1 parent d9d4e6b commit ca005ab
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 13 deletions.
9 changes: 5 additions & 4 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,16 +237,17 @@ func processVirtualizationOutStream(
return outCh
}

func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (volumes []string, err error) {
func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (autoVolumes []string, hardVolumes []string, err error) {
sizeMap := map[string]int64{} // {"AUTO:/data:rw": 100}
for _, vol := range append(volumes1, volumes2...) {
parts := strings.Split(vol, ":")
if len(parts) != 4 || parts[0] != types.AUTO {
hardVolumes = append(hardVolumes, vol)
continue
}
size, err := strconv.ParseInt(parts[3], 10, 64)
if err != nil {
return nil, err
return nil, nil, err
}
prefix := strings.Join(parts[0:3], ":")
if _, ok := sizeMap[prefix]; ok {
Expand All @@ -260,7 +261,7 @@ func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (volumes []st
if size < 0 {
continue
}
volumes = append(volumes, fmt.Sprintf("%s:%d", prefix, size))
autoVolumes = append(autoVolumes, fmt.Sprintf("%s:%d", prefix, size))
}
return volumes, nil
return autoVolumes, hardVolumes, nil
}
16 changes: 16 additions & 0 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package calcium

import (
"context"
"errors"
"testing"

lockmocks "github.com/projecteru2/core/lock/mocks"
Expand Down Expand Up @@ -105,6 +106,7 @@ func TestSetNode(t *testing.T) {
ctx := context.Background()
name := "test"
node := &types.Node{Name: name}
node.Init()

store := &storemocks.Store{}
c.store = store
Expand Down Expand Up @@ -216,4 +218,18 @@ func TestSetNode(t *testing.T) {
assert.Equal(t, n.InitCPU["3"], int64(10))
assert.Equal(t, len(n.CPU), 2)
assert.Equal(t, len(n.InitCPU), 2)
// succ set volume
n.Volume = types.VolumeMap{"/sda1": 10, "/sda2": 20}
setOpts.DeltaCPU = nil
setOpts.DeltaVolume = types.VolumeMap{"/sda0": 5, "/sda1": 0, "/sda2": -1}
n, err = c.SetNode(ctx, setOpts)
assert.NoError(t, err)
_, ok = n.Volume["/sda1"]
assert.False(t, ok)
assert.Equal(t, n.Volume["/sda0"], int64(5))
assert.Equal(t, n.Volume["/sda2"], int64(19))
// failed by nagative volume size
setOpts.DeltaVolume = types.VolumeMap{"/sda0": -100}
n, err = c.SetNode(ctx, setOpts)
assert.True(t, errors.Is(err, types.ErrBadVolume))
}
17 changes: 9 additions & 8 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (c *Calcium) doReallocContainer(
cpu float64, memory int64, volumes []string) {

volCPUMemNodeContainersInfo := volCPUMemNodeContainers{}
hardVolumesForContainer := map[string][]string{}
for nodename, containers := range nodeContainersInfo {
for _, container := range containers {
newCPU := utils.Round(container.Quota + cpu)
Expand All @@ -86,14 +87,15 @@ func (c *Calcium) doReallocContainer(
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
vol, err := mergeAutoVolumeRequests(container.Volumes, volumes)
autoVolumes, hardVolumes, err := mergeAutoVolumeRequests(container.Volumes, volumes)
hardVolumesForContainer[container.ID] = hardVolumes
if err != nil {
log.Errorf("[doReallocContainer] New resource invalid %s, vol %v, err %v", container.ID, vol, err)
log.Errorf("[doReallocContainer] New resource invalid %s, vol %v, err %v", container.ID, volumes, err)
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
continue
}
sort.Slice(vol, func(i, j int) bool { return vol[i] < vol[j] })
newVol := strings.Join(vol, ",")
sort.Slice(autoVolumes, func(i, j int) bool { return autoVolumes[i] < autoVolumes[j] })
newVol := strings.Join(autoVolumes, ",")

if _, ok := volCPUMemNodeContainersInfo[newVol]; !ok {
volCPUMemNodeContainersInfo[newVol] = map[float64]map[int64]nodeContainers{}
Expand Down Expand Up @@ -157,9 +159,8 @@ func (c *Calcium) doReallocContainer(
var volumePlans []types.VolumePlan
// newVol won't be empty as long as existing container bound volumes before or realloc request requires new binding
if newVol != "" {
volumes := strings.Split(newVol, ",")
nodesInfo := []types.NodeInfo{{Name: node.Name, VolumeMap: node.Volume}}
_, nodeVolumePlans, total, err := c.scheduler.SelectVolumeNodes(nodesInfo, volumes)
_, nodeVolumePlans, total, err := c.scheduler.SelectVolumeNodes(nodesInfo, strings.Split(newVol, ","))
if err != nil {
return err
}
Expand All @@ -184,7 +185,7 @@ func (c *Calcium) doReallocContainer(
}

for idx, container := range containers {
newResource := &enginetypes.VirtualizationResource{Quota: newCPU, Memory: newMemory, SoftLimit: container.SoftLimit, Volumes: volumes}
newResource := &enginetypes.VirtualizationResource{Quota: newCPU, Memory: newMemory, SoftLimit: container.SoftLimit, Volumes: strings.Split(newVol, ",")}
if len(container.CPU) > 0 {
newResource.CPU = cpusets[0]
newResource.NUMANode = node.GetNUMANode(cpusets[0])
Expand All @@ -201,7 +202,7 @@ func (c *Calcium) doReallocContainer(
container.CPU = newResource.CPU
container.Quota = newResource.Quota
container.Memory = newResource.Memory
container.Volumes = newResource.Volumes
container.Volumes = append(newResource.Volumes, hardVolumesForContainer[container.ID]...)
container.VolumePlan = types.ToVolumePlan(newResource.VolumePlan)
updateSuccess = true
} else {
Expand Down
2 changes: 1 addition & 1 deletion engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (e *Engine) VirtualizationUpdateResource(ctx context.Context, ID string, op
return coretypes.ErrBadMemory
}
if opts.VolumePlan != nil {
log.Warn("[VirtualizationUpdateResource] docker engine not support rebinding volume resource")
log.Warnf("[VirtualizationUpdateResource] docker engine not support rebinding volume resource: %v", opts.Volumes)
}
newResource := makeResourceSetting(opts.Quota, opts.Memory, opts.CPU, opts.NUMANode, opts.SoftLimit)
updateConfig := dockercontainer.UpdateConfig{Resources: newResource}
Expand Down
3 changes: 3 additions & 0 deletions store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ func (m *Mercury) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ
if opts.Share == 0 {
opts.Share = m.config.Scheduler.ShareBase
}
if opts.Volume == nil {
opts.Volume = types.VolumeMap{}
}
// 设置 numa 的内存默认值,如果没有的话,按照 numa node 个数均分
if len(opts.Numa) > 0 {
nodeIDs := map[string]struct{}{}
Expand Down

0 comments on commit ca005ab

Please sign in to comment.