From ca005ab5380ac62db20bbe66738bd66f3c49a85d Mon Sep 17 00:00:00 2001 From: zc Date: Thu, 9 Jan 2020 17:51:16 +0800 Subject: [PATCH] realloc hard volume, like /tmp:/tmp --- cluster/calcium/helper.go | 9 +++++---- cluster/calcium/node_test.go | 16 ++++++++++++++++ cluster/calcium/realloc.go | 17 +++++++++-------- engine/docker/container.go | 2 +- store/etcdv3/node.go | 3 +++ 5 files changed, 34 insertions(+), 13 deletions(-) diff --git a/cluster/calcium/helper.go b/cluster/calcium/helper.go index 10a9aa362..8068f23c4 100644 --- a/cluster/calcium/helper.go +++ b/cluster/calcium/helper.go @@ -237,16 +237,17 @@ func processVirtualizationOutStream( return outCh } -func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (volumes []string, err error) { +func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (autoVolumes []string, hardVolumes []string, err error) { sizeMap := map[string]int64{} // {"AUTO:/data:rw": 100} for _, vol := range append(volumes1, volumes2...) { parts := strings.Split(vol, ":") if len(parts) != 4 || parts[0] != types.AUTO { + hardVolumes = append(hardVolumes, vol) continue } size, err := strconv.ParseInt(parts[3], 10, 64) if err != nil { - return nil, err + return nil, nil, err } prefix := strings.Join(parts[0:3], ":") if _, ok := sizeMap[prefix]; ok { @@ -260,7 +261,7 @@ func mergeAutoVolumeRequests(volumes1 []string, volumes2 []string) (volumes []st if size < 0 { continue } - volumes = append(volumes, fmt.Sprintf("%s:%d", prefix, size)) + autoVolumes = append(autoVolumes, fmt.Sprintf("%s:%d", prefix, size)) } - return volumes, nil + return autoVolumes, hardVolumes, nil } diff --git a/cluster/calcium/node_test.go b/cluster/calcium/node_test.go index 677d86015..8aad8982a 100644 --- a/cluster/calcium/node_test.go +++ b/cluster/calcium/node_test.go @@ -2,6 +2,7 @@ package calcium import ( "context" + "errors" "testing" lockmocks "github.com/projecteru2/core/lock/mocks" @@ -105,6 +106,7 @@ func TestSetNode(t *testing.T) { ctx := context.Background() name := "test" node := &types.Node{Name: name} + node.Init() store := &storemocks.Store{} c.store = store @@ -216,4 +218,18 @@ func TestSetNode(t *testing.T) { assert.Equal(t, n.InitCPU["3"], int64(10)) assert.Equal(t, len(n.CPU), 2) assert.Equal(t, len(n.InitCPU), 2) + // succ set volume + n.Volume = types.VolumeMap{"/sda1": 10, "/sda2": 20} + setOpts.DeltaCPU = nil + setOpts.DeltaVolume = types.VolumeMap{"/sda0": 5, "/sda1": 0, "/sda2": -1} + n, err = c.SetNode(ctx, setOpts) + assert.NoError(t, err) + _, ok = n.Volume["/sda1"] + assert.False(t, ok) + assert.Equal(t, n.Volume["/sda0"], int64(5)) + assert.Equal(t, n.Volume["/sda2"], int64(19)) + // failed by nagative volume size + setOpts.DeltaVolume = types.VolumeMap{"/sda0": -100} + n, err = c.SetNode(ctx, setOpts) + assert.True(t, errors.Is(err, types.ErrBadVolume)) } diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 830870d33..9f3bd1c24 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -77,6 +77,7 @@ func (c *Calcium) doReallocContainer( cpu float64, memory int64, volumes []string) { volCPUMemNodeContainersInfo := volCPUMemNodeContainers{} + hardVolumesForContainer := map[string][]string{} for nodename, containers := range nodeContainersInfo { for _, container := range containers { newCPU := utils.Round(container.Quota + cpu) @@ -86,14 +87,15 @@ func (c *Calcium) doReallocContainer( ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} continue } - vol, err := mergeAutoVolumeRequests(container.Volumes, volumes) + autoVolumes, hardVolumes, err := mergeAutoVolumeRequests(container.Volumes, volumes) + hardVolumesForContainer[container.ID] = hardVolumes if err != nil { - log.Errorf("[doReallocContainer] New resource invalid %s, vol %v, err %v", container.ID, vol, err) + log.Errorf("[doReallocContainer] New resource invalid %s, vol %v, err %v", container.ID, volumes, err) ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} continue } - sort.Slice(vol, func(i, j int) bool { return vol[i] < vol[j] }) - newVol := strings.Join(vol, ",") + sort.Slice(autoVolumes, func(i, j int) bool { return autoVolumes[i] < autoVolumes[j] }) + newVol := strings.Join(autoVolumes, ",") if _, ok := volCPUMemNodeContainersInfo[newVol]; !ok { volCPUMemNodeContainersInfo[newVol] = map[float64]map[int64]nodeContainers{} @@ -157,9 +159,8 @@ func (c *Calcium) doReallocContainer( var volumePlans []types.VolumePlan // newVol won't be empty as long as existing container bound volumes before or realloc request requires new binding if newVol != "" { - volumes := strings.Split(newVol, ",") nodesInfo := []types.NodeInfo{{Name: node.Name, VolumeMap: node.Volume}} - _, nodeVolumePlans, total, err := c.scheduler.SelectVolumeNodes(nodesInfo, volumes) + _, nodeVolumePlans, total, err := c.scheduler.SelectVolumeNodes(nodesInfo, strings.Split(newVol, ",")) if err != nil { return err } @@ -184,7 +185,7 @@ func (c *Calcium) doReallocContainer( } for idx, container := range containers { - newResource := &enginetypes.VirtualizationResource{Quota: newCPU, Memory: newMemory, SoftLimit: container.SoftLimit, Volumes: volumes} + newResource := &enginetypes.VirtualizationResource{Quota: newCPU, Memory: newMemory, SoftLimit: container.SoftLimit, Volumes: strings.Split(newVol, ",")} if len(container.CPU) > 0 { newResource.CPU = cpusets[0] newResource.NUMANode = node.GetNUMANode(cpusets[0]) @@ -201,7 +202,7 @@ func (c *Calcium) doReallocContainer( container.CPU = newResource.CPU container.Quota = newResource.Quota container.Memory = newResource.Memory - container.Volumes = newResource.Volumes + container.Volumes = append(newResource.Volumes, hardVolumesForContainer[container.ID]...) container.VolumePlan = types.ToVolumePlan(newResource.VolumePlan) updateSuccess = true } else { diff --git a/engine/docker/container.go b/engine/docker/container.go index 1f7cbc09f..40f6c7460 100644 --- a/engine/docker/container.go +++ b/engine/docker/container.go @@ -295,7 +295,7 @@ func (e *Engine) VirtualizationUpdateResource(ctx context.Context, ID string, op return coretypes.ErrBadMemory } if opts.VolumePlan != nil { - log.Warn("[VirtualizationUpdateResource] docker engine not support rebinding volume resource") + log.Warnf("[VirtualizationUpdateResource] docker engine not support rebinding volume resource: %v", opts.Volumes) } newResource := makeResourceSetting(opts.Quota, opts.Memory, opts.CPU, opts.NUMANode, opts.SoftLimit) updateConfig := dockercontainer.UpdateConfig{Resources: newResource} diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index 29bc3be0d..96862be64 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -55,6 +55,9 @@ func (m *Mercury) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*typ if opts.Share == 0 { opts.Share = m.config.Scheduler.ShareBase } + if opts.Volume == nil { + opts.Volume = types.VolumeMap{} + } // 设置 numa 的内存默认值,如果没有的话,按照 numa node 个数均分 if len(opts.Numa) > 0 { nodeIDs := map[string]struct{}{}