Skip to content

Commit

Permalink
storage follows volume sizes (#186)
Browse files Browse the repository at this point in the history
* storage coordinates with volume

* normalize deploy resource requirements

* realloc api will update containers' storage following volumes

* docker engine can handle storage with volume size

* bump version: 04.02

* delete metrics log flood

* fix unittest

* add comments for exported functions
  • Loading branch information
zc authored Apr 17, 2020
1 parent 5e2333c commit 8607785
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 26 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
20.04.01
20.04.02
2 changes: 2 additions & 0 deletions cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

// CreateContainer use options to create containers
func (c *Calcium) CreateContainer(ctx context.Context, opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {
opts.Normalize()
opts.ProcessIdent = utils.RandomString(16)
pod, err := c.store.GetPod(ctx, opts.Podname)
if err != nil {
Expand Down Expand Up @@ -277,6 +278,7 @@ func (c *Calcium) doMakeContainerOptions(index int, cpumap types.CPUMap, volumeP
config.Stdin = opts.OpenStdin
config.Hosts = opts.ExtraHosts
config.Volumes = opts.Volumes.ApplyPlan(volumePlan).ToStringSlice(false, true)
config.VolumePlan = volumePlan.ToLiteral()
config.Debug = opts.Debug
config.Network = opts.NetworkMode
config.Networks = opts.Networks
Expand Down
9 changes: 3 additions & 6 deletions cluster/calcium/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,9 @@ import (
log "github.com/sirupsen/logrus"
)

// AddNode add a node in pod
/*
func (c *Calcium) AddNode(ctx context.Context, nodename, endpoint, podname, ca, cert, key string,
cpu, share int, memory, storage int64, labels map[string]string,
numa types.NUMA, numaMemory types.NUMAMemory) (*types.Node, error) {
*/
// AddNode adds a node
func (c *Calcium) AddNode(ctx context.Context, opts *types.AddNodeOptions) (*types.Node, error) {
opts.Normalize()
return c.store.AddNode(ctx, opts)
}

Expand Down Expand Up @@ -54,6 +50,7 @@ func (c *Calcium) GetNodes(ctx context.Context, podname, nodename string, labels
func (c *Calcium) SetNode(ctx context.Context, opts *types.SetNodeOptions) (*types.Node, error) {
var n *types.Node
return n, c.withNodeLocked(ctx, opts.Nodename, func(node *types.Node) error {
opts.Normalize(node)
n = node
litter.Dump(opts)
n.Available = (opts.Status == cluster.NodeUp)
Expand Down
5 changes: 0 additions & 5 deletions cluster/calcium/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package calcium

import (
"context"
"errors"
"testing"

lockmocks "github.com/projecteru2/core/lock/mocks"
Expand Down Expand Up @@ -231,8 +230,4 @@ func TestSetNode(t *testing.T) {
assert.False(t, ok)
assert.Equal(t, n.Volume["/sda0"], int64(5))
assert.Equal(t, n.Volume["/sda2"], int64(19))
// failed by nagative volume size
setOpts.DeltaVolume = types.VolumeMap{"/sda0": -100}
n, err = c.SetNode(ctx, setOpts)
assert.True(t, errors.Is(err, types.ErrBadVolume))
}
4 changes: 4 additions & 0 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func (c *Calcium) doReallocContainer(
node.CPU.Add(container.CPU)
node.SetCPUUsed(container.Quota, types.DecrUsage)
node.Volume.Add(container.VolumePlan.IntoVolumeMap())
node.StorageCap += container.Storage
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.DecrUsage)
node.MemCap += container.Memory
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
Expand Down Expand Up @@ -268,11 +269,13 @@ func (c *Calcium) updateContainersResources(ctx context.Context, ch chan *types.

func (c *Calcium) updateResource(ctx context.Context, node *types.Node, container *types.Container, newResource *enginetypes.VirtualizationResource) error {
if err := node.Engine.VirtualizationUpdateResource(ctx, container.ID, newResource); err == nil {
oldVolumes := container.Volumes
container.CPU = newResource.CPU
container.Quota = newResource.Quota
container.Memory = newResource.Memory
container.Volumes, _ = types.MakeVolumeBindings(newResource.Volumes)
container.VolumePlan = types.MustToVolumePlan(newResource.VolumePlan)
container.Storage += container.Volumes.TotalSize() - oldVolumes.TotalSize()
} else {
log.Errorf("[updateResource] When Realloc container, VirtualizationUpdateResource %s failed %v", container.ID, err)
return err
Expand All @@ -284,6 +287,7 @@ func (c *Calcium) updateResource(ctx context.Context, node *types.Node, containe
node.SetCPUUsed(container.Quota, types.IncrUsage)
node.Volume.Sub(container.VolumePlan.IntoVolumeMap())
node.SetVolumeUsed(container.VolumePlan.IntoVolumeMap().Total(), types.IncrUsage)
node.StorageCap -= container.Storage
node.MemCap -= container.Memory
if nodeID := node.GetNUMANode(container.CPU); nodeID != "" {
node.DecrNUMANodeMemory(nodeID, container.Memory)
Expand Down
13 changes: 12 additions & 1 deletion engine/docker/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"

"github.com/docker/go-connections/nat"
"github.com/docker/go-units"
Expand Down Expand Up @@ -128,7 +129,17 @@ func (e *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.Vir
}
}
if opts.Storage > 0 {
rArgs.StorageOpt["size"] = fmt.Sprintf("%v", opts.Storage)
volumeTotal := int64(0)
for volume, plan := range opts.VolumePlan {
if strings.HasPrefix(volume, "AUTO") {
for _, size := range plan {
volumeTotal += size
}
}
}
if opts.Storage-volumeTotal > 0 {
rArgs.StorageOpt["size"] = fmt.Sprintf("%v", opts.Storage-volumeTotal)
}
}
// 如果有指定用户,用指定用户
// 没有指定用户,用镜像自己的
Expand Down
1 change: 0 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func (m *Metrics) count(key string, n int, rate float32) error {

// SendNodeInfo update node resource capacity
func (m *Metrics) SendNodeInfo(node *types.Node) {
log.Debugf("[Metrics] Update %s metrics", node.Name)
nodename := node.Name
podname := node.Podname

Expand Down
2 changes: 1 addition & 1 deletion rpc/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func toCoreDeployOptions(d *pb.DeployOptions) (*types.DeployOptions, error) {
CPUQuota: d.CpuQuota,
CPUBind: d.CpuBind,
Memory: d.Memory,
Storage: d.Storage + vbs.AdditionalStorage(),
Storage: d.Storage,
Count: int(d.Count),
Env: d.Env,
DNS: d.Dns,
Expand Down
20 changes: 20 additions & 0 deletions types/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ type DeployOptions struct {
Lambda bool // indicate is lambda container or not
}

// Normalize keeps deploy options consistant
func (o *DeployOptions) Normalize() {
o.Storage += o.Volumes.TotalSize()
}

// RunAndWaitOptions is options for running and waiting
type RunAndWaitOptions struct {
DeployOptions
Expand Down Expand Up @@ -91,6 +96,11 @@ type AddNodeOptions struct {
Volume VolumeMap
}

// Normalize keeps options consistant
func (o *AddNodeOptions) Normalize() {
o.Storage += o.Volume.Total()
}

// SetNodeOptions for node set
type SetNodeOptions struct {
Nodename string
Expand All @@ -105,6 +115,16 @@ type SetNodeOptions struct {
Labels map[string]string
}

// Normalize keeps options consistent
func (o *SetNodeOptions) Normalize(node *Node) {
o.DeltaStorage += o.DeltaVolume.Total()
for volID, size := range o.DeltaVolume {
if size == 0 {
o.DeltaStorage -= node.InitVolume[volID]
}
}
}

// ExecuteContainerOptions for executing commands in running container
type ExecuteContainerOptions struct {
ContainerID string
Expand Down
10 changes: 0 additions & 10 deletions types/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,16 +135,6 @@ func (vbs VolumeBindings) MarshalJSON() ([]byte, error) {
return json.Marshal(volumes)
}

// AdditionalStorage is used for another kind of resource: storage
func (vbs VolumeBindings) AdditionalStorage() (storage int64) {
for _, vb := range vbs {
if !vb.RequireSchedule() {
storage += vb.SizeInBytes
}
}
return
}

// ApplyPlan creates new VolumeBindings according to volume plan
func (vbs VolumeBindings) ApplyPlan(plan VolumePlan) (res VolumeBindings) {
for _, vb := range vbs {
Expand Down
1 change: 0 additions & 1 deletion types/volume_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func TestVolumeBindings(t *testing.T) {
vbs, _ := MakeVolumeBindings([]string{"/1:/dst:rw:1000", "/0:/dst:rom"})
assert.Equal(t, vbs.ToStringSlice(false, false), []string{"/1:/dst:rw:1000", "/0:/dst:rom:0"})
assert.Equal(t, vbs.ToStringSlice(true, false), []string{"/0:/dst:rom:0", "/1:/dst:rw:1000"})
assert.Equal(t, vbs.AdditionalStorage(), int64(1000))
assert.Equal(t, vbs.TotalSize(), int64(1000))

vbs1, _ := MakeVolumeBindings([]string{"AUTO:/data0:rw:1", "AUTO:/data1:rw:2", "/mnt1:/data2:rw", "/mnt2:/data3:ro"})
Expand Down

0 comments on commit 8607785

Please sign in to comment.