From 8607785aad88c65c0e0067cca76b778642573f22 Mon Sep 17 00:00:00 2001 From: zc Date: Fri, 17 Apr 2020 11:17:32 +0800 Subject: [PATCH] storage follows volume sizes (#186) * storage coordinates with volume * normalize deploy resource requirements * realloc api will update containers' storage following volumes * docker engine can handle storage with volume size * bump version: 04.02 * delete metrics log flood * fix unittest * add comments for exported functions --- VERSION | 2 +- cluster/calcium/create.go | 2 ++ cluster/calcium/node.go | 9 +++------ cluster/calcium/node_test.go | 5 ----- cluster/calcium/realloc.go | 4 ++++ engine/docker/container.go | 13 ++++++++++++- metrics/metrics.go | 1 - rpc/transform.go | 2 +- types/options.go | 20 ++++++++++++++++++++ types/volume.go | 10 ---------- types/volume_test.go | 1 - 11 files changed, 43 insertions(+), 26 deletions(-) diff --git a/VERSION b/VERSION index d086eb847..e358dcd55 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -20.04.01 +20.04.02 diff --git a/cluster/calcium/create.go b/cluster/calcium/create.go index 6d6d5a56e..9fa1c0c3b 100644 --- a/cluster/calcium/create.go +++ b/cluster/calcium/create.go @@ -18,6 +18,7 @@ import ( // CreateContainer use options to create containers func (c *Calcium) CreateContainer(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) { + opts.Normalize() opts.ProcessIdent = utils.RandomString(16) pod, err := c.store.GetPod(ctx, opts.Podname) if err != nil { @@ -277,6 +278,7 @@ func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, volumeP config.Stdin = opts.OpenStdin config.Hosts = opts.ExtraHosts config.Volumes = opts.Volumes.ApplyPlan(volumePlan).ToStringSlice(false, true) + config.VolumePlan = volumePlan.ToLiteral() config.Debug = opts.Debug config.Network = opts.NetworkMode config.Networks = opts.Networks diff --git a/cluster/calcium/node.go b/cluster/calcium/node.go index f77d7b7f1..dc2565a2b 100644 --- a/cluster/calcium/node.go +++ b/cluster/calcium/node.go @@ -9,13 +9,9 @@ import ( log "github.com/sirupsen/logrus" ) -// AddNode add a node in pod -/* -func (c *Calcium) AddNode(ctx context.Context, nodename, endpoint, podname, ca, cert, key string, - cpu, share int, memory, storage int64, labels map[string]string, - numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error) { -*/ +// AddNode adds a node func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*types.Node, error) { + opts.Normalize() return c.store.AddNode(ctx, opts) } @@ -54,6 +50,7 @@ func (c *Calcium) GetNodes(ctx context.Context, podname, nodename string, labels func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) { var n *types.Node return n, c.withNodeLocked(ctx, opts.Nodename, func(node *types.Node) error { + opts.Normalize(node) n = node litter.Dump(opts) n.Available = (opts.Status == cluster.NodeUp) diff --git a/cluster/calcium/node_test.go b/cluster/calcium/node_test.go index 7646de27f..a51b5256c 100644 --- a/cluster/calcium/node_test.go +++ b/cluster/calcium/node_test.go @@ -2,7 +2,6 @@ package calcium import ( "context" - "errors" "testing" lockmocks "github.com/projecteru2/core/lock/mocks" @@ -231,8 +230,4 @@ func TestSetNode(t *testing.T) { assert.False(t, ok) assert.Equal(t, n.Volume["/sda0"], int64(5)) assert.Equal(t, n.Volume["/sda2"], int64(19)) - // failed by nagative volume size - setOpts.DeltaVolume = types.VolumeMap{"/sda0": -100} - n, err = c.SetNode(ctx, setOpts) - assert.True(t, errors.Is(err, types.ErrBadVolume)) } diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 712ced4d3..dc97a1b8c 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -147,6 +147,7 @@ func (c *Calcium) doReallocContainer( node.CPU.Add(container.CPU) node.SetCPUUsed(container.Quota, types.DecrUsage) node.Volume.Add(container.VolumePlan.IntoVolumeMap()) + node.StorageCap += container.Storage node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.DecrUsage) node.MemCap += container.Memory if nodeID := node.GetNUMANode(container.CPU); nodeID != "" { @@ -268,11 +269,13 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types. func (c *Calcium) updateResource(ctx context.Context, node *types.Node, container *types.Container, newResource *enginetypes.VirtualizationResource) error { if err := node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource); err == nil { + oldVolumes := container.Volumes container.CPU = newResource.CPU container.Quota = newResource.Quota container.Memory = newResource.Memory container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes) container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan) + container.Storage += container.Volumes.TotalSize() - oldVolumes.TotalSize() } else { log.Errorf("[updateResource] When Realloc container, VirtualizationUpdateResource %s failed %v", container.ID, err) return err @@ -284,6 +287,7 @@ func (c *Calcium) updateResource(ctx context.Context, node *types.Node, containe node.SetCPUUsed(container.Quota, types.IncrUsage) node.Volume.Sub(container.VolumePlan.IntoVolumeMap()) node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage) + node.StorageCap -= container.Storage node.MemCap -= container.Memory if nodeID := node.GetNUMANode(container.CPU); nodeID != "" { node.DecrNUMANodeMemory(nodeID, container.Memory) diff --git a/engine/docker/container.go b/engine/docker/container.go index a45811ed8..596cd551e 100644 --- a/engine/docker/container.go +++ b/engine/docker/container.go @@ -10,6 +10,7 @@ import ( "os" "path/filepath" "strconv" + "strings" "github.com/docker/go-connections/nat" "github.com/docker/go-units" @@ -128,7 +129,17 @@ func (e *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.Vir } } if opts.Storage > 0 { - rArgs.StorageOpt["size"] = fmt.Sprintf("%v", opts.Storage) + volumeTotal := int64(0) + for volume, plan := range opts.VolumePlan { + if strings.HasPrefix(volume, "AUTO") { + for _, size := range plan { + volumeTotal += size + } + } + } + if opts.Storage-volumeTotal > 0 { + rArgs.StorageOpt["size"] = fmt.Sprintf("%v", opts.Storage-volumeTotal) + } } // 如果有指定用户,用指定用户 // 没有指定用户,用镜像自己的 diff --git a/metrics/metrics.go b/metrics/metrics.go index 644af55fa..19cbad302 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -71,7 +71,6 @@ func (m *Metrics) count(key string, n int, rate float32) error { // SendNodeInfo update node resource capacity func (m *Metrics) SendNodeInfo(node *types.Node) { - log.Debugf("[Metrics] Update %s metrics", node.Name) nodename := node.Name podname := node.Podname diff --git a/rpc/transform.go b/rpc/transform.go index f5602413c..fb3c59ee1 100644 --- a/rpc/transform.go +++ b/rpc/transform.go @@ -264,7 +264,7 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) { CPUQuota: d.CpuQuota, CPUBind: d.CpuBind, Memory: d.Memory, - Storage: d.Storage + vbs.AdditionalStorage(), + Storage: d.Storage, Count: int(d.Count), Env: d.Env, DNS: d.Dns, diff --git a/types/options.go b/types/options.go index bb7f9274b..8009302b9 100644 --- a/types/options.go +++ b/types/options.go @@ -37,6 +37,11 @@ type DeployOptions struct { Lambda bool // indicate is lambda container or not } +// Normalize keeps deploy options consistant +func (o *DeployOptions) Normalize() { + o.Storage += o.Volumes.TotalSize() +} + // RunAndWaitOptions is options for running and waiting type RunAndWaitOptions struct { DeployOptions @@ -91,6 +96,11 @@ type AddNodeOptions struct { Volume VolumeMap } +// Normalize keeps options consistant +func (o *AddNodeOptions) Normalize() { + o.Storage += o.Volume.Total() +} + // SetNodeOptions for node set type SetNodeOptions struct { Nodename string @@ -105,6 +115,16 @@ type SetNodeOptions struct { Labels map[string]string } +// Normalize keeps options consistent +func (o *SetNodeOptions) Normalize(node *Node) { + o.DeltaStorage += o.DeltaVolume.Total() + for volID, size := range o.DeltaVolume { + if size == 0 { + o.DeltaStorage -= node.InitVolume[volID] + } + } +} + // ExecuteContainerOptions for executing commands in running container type ExecuteContainerOptions struct { ContainerID string diff --git a/types/volume.go b/types/volume.go index 778c38172..4c334d87d 100644 --- a/types/volume.go +++ b/types/volume.go @@ -135,16 +135,6 @@ func (vbs VolumeBindings) MarshalJSON() ([]byte, error) { return json.Marshal(volumes) } -// AdditionalStorage is used for another kind of resource: storage -func (vbs VolumeBindings) AdditionalStorage() (storage int64) { - for _, vb := range vbs { - if !vb.RequireSchedule() { - storage += vb.SizeInBytes - } - } - return -} - // ApplyPlan creates new VolumeBindings according to volume plan func (vbs VolumeBindings) ApplyPlan(plan VolumePlan) (res VolumeBindings) { for _, vb := range vbs { diff --git a/types/volume_test.go b/types/volume_test.go index 5be610eb5..728113ca0 100644 --- a/types/volume_test.go +++ b/types/volume_test.go @@ -85,7 +85,6 @@ func TestVolumeBindings(t *testing.T) { vbs, _ := MakeVolumeBindings([]string{"/1:/dst:rw:1000", "/0:/dst:rom"}) assert.Equal(t, vbs.ToStringSlice(false, false), []string{"/1:/dst:rw:1000", "/0:/dst:rom:0"}) assert.Equal(t, vbs.ToStringSlice(true, false), []string{"/0:/dst:rom:0", "/1:/dst:rw:1000"}) - assert.Equal(t, vbs.AdditionalStorage(), int64(1000)) assert.Equal(t, vbs.TotalSize(), int64(1000)) vbs1, _ := MakeVolumeBindings([]string{"AUTO:/data0:rw:1", "AUTO:/data1:rw:2", "/mnt1:/data2:rw", "/mnt2:/data3:ro"})