Skip to content

Commit

Permalink
support raw res containers
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Mar 5, 2018
1 parent 8faa1ae commit 906b2d5
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 227 deletions.
47 changes: 29 additions & 18 deletions cluster/calcium/create_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@ func (c *calcium) CreateContainer(opts *types.DeployOptions) (chan *types.Create
log.Errorf("[CreateContainer] Error during GetPod for %s: %v", opts.Podname, err)
return nil, err
}
if pod.Favor == scheduler.CPU_PRIOR {
log.Infof("[CreateContainer] Creating container with options: %v", opts)
if opts.RawResource || pod.Favor == scheduler.MEMORY_PRIOR {
return c.createContainerWithMemoryPrior(opts)
} else if pod.Favor == scheduler.CPU_PRIOR {
return c.createContainerWithCPUPrior(opts)
}
log.Infof("[CreateContainer] Creating container with options: %v", opts)
return c.createContainerWithMemoryPrior(opts)
return nil, fmt.Errorf("[CreateContainer] Favor not support: %v", pod.Favor)
}

func (c *calcium) createContainerWithMemoryPrior(opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {
ch := make(chan *types.CreateContainerMessage)
if opts.Memory < minMemory { // 4194304 Byte = 4 MB, docker 创建容器的内存最低标准
// 4194304 Byte = 4 MB, docker 创建容器的内存最低标准
// -1 means without limit
if opts.Memory < minMemory && !opts.RawResource {
return ch, fmt.Errorf("Minimum memory limit allowed is 4MB, got %d", opts.Memory)
}
if opts.Count <= 0 { // Count 要大于0
Expand Down Expand Up @@ -86,11 +90,13 @@ func (c *calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, opts

node, err := c.getAndPrepareNode(opts.Podname, nodeInfo.Name, opts.Image)
if err != nil {
log.Errorf("[doCreateContainerWithCPUPrior] Get and prepare node error %v", err)
log.Errorf("[doCreateContainerWithMemoryPrior] Get and prepare node error %v", err)
for i := 0; i < nodeInfo.Deploy; i++ {
ms[i] = &types.CreateContainerMessage{Error: err}
if err := c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+"); err != nil {
log.Errorf("[doCreateContainerWithCPUPrior] reset node memory failed %v", err)
if !opts.RawResource {
if err := c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+"); err != nil {
log.Errorf("[doCreateContainerWithMemoryPrior] reset node memory failed %v", err)
}
}
}
return ms
Expand Down Expand Up @@ -298,8 +304,10 @@ func (c *calcium) makeContainerOptions(index int, quota types.CPUMap, opts *type
var resource enginecontainer.Resources
if favor == scheduler.CPU_PRIOR {
resource = c.makeCPUPriorSetting(quota)
} else {
} else if favor == scheduler.MEMORY_PRIOR {
resource = c.makeMemoryPriorSetting(opts.Memory, opts.CPUQuota)
} else {
return nil, nil, nil, "", fmt.Errorf("favor not support %s", favor)
}
resource.Ulimits = ulimits

Expand Down Expand Up @@ -356,22 +364,25 @@ func (c *calcium) getAndPrepareNode(podname, nodename, image string) (*types.Nod
func (c *calcium) createAndStartContainer(
no int, node *types.Node,
opts *types.DeployOptions,
quota types.CPUMap, typ string,
cpu types.CPUMap, typ string,
) *types.CreateContainerMessage {
container := &types.Container{
Podname: opts.Podname,
Nodename: node.Name,
CPU: quota,
Memory: opts.Memory,
Hook: opts.Entrypoint.Hook,
Privileged: opts.Entrypoint.Privileged,
Engine: node.Engine,
Podname: opts.Podname,
Nodename: node.Name,
CPU: cpu,
Quota: opts.CPUQuota,
Memory: opts.Memory,
Hook: opts.Entrypoint.Hook,
Privileged: opts.Entrypoint.Privileged,
RawResource: opts.RawResource,
Engine: node.Engine,
}
createContainerMessage := &types.CreateContainerMessage{
Podname: container.Podname,
Nodename: container.Nodename,
Success: false,
CPU: quota,
CPU: cpu,
Quota: opts.CPUQuota,
Memory: opts.Memory,
Publish: map[string]string{},
}
Expand All @@ -385,7 +396,7 @@ func (c *calcium) createAndStartContainer(
}()

// get config
config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(no, quota, opts, node, typ)
config, hostConfig, networkConfig, containerName, err := c.makeContainerOptions(no, cpu, opts, node, typ)
if err != nil {
createContainerMessage.Error = err
return createContainerMessage
Expand Down
13 changes: 8 additions & 5 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
)

func (c *calcium) makeMemoryPriorSetting(memory int64, cpu float64) enginecontainer.Resources {
resource := enginecontainer.Resources{
Memory: memory,
MemorySwap: memory,
CPUPeriod: utils.CpuPeriodBase,
CPUQuota: int64(cpu * float64(utils.CpuPeriodBase)),
resource := enginecontainer.Resources{}
if cpu > 0 {
resource.CPUPeriod = utils.CpuPeriodBase
resource.CPUQuota = int64(cpu * float64(utils.CpuPeriodBase))
}
if memory != -1 {
resource.Memory = memory
resource.MemorySwap = memory
}
return resource
}
Expand Down
5 changes: 5 additions & 0 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (c *calcium) ReallocResource(ids []string, cpu float64, mem int64) (chan *t
nodeCache := map[string]*types.Node{}

for _, container := range containers {
if container.RawResource {
//TODO not support yet
return nil, fmt.Errorf("Realloc raw resource container not support yet")
}

if _, ok := nodeCache[container.Nodename]; !ok {
node, err := c.GetNode(container.Podname, container.Nodename)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cluster/calcium/remove_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func (c *calcium) RemoveContainer(ids []string, force bool) (chan *types.RemoveC
// remove one container
func (c *calcium) removeOneContainer(container *types.Container) error {
defer func() {
// not deal with raw res container
if container.RawResource {
return
}
// if total cpu of container > 0, then we need to release these core resource
// but if it's 0, just ignore to save 1 time write on etcd.
if container.CPU.Total() > 0 {
Expand Down
15 changes: 12 additions & 3 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

log "github.com/Sirupsen/logrus"

"github.com/projecteru2/core/scheduler/complex"
"github.com/projecteru2/core/stats"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
Expand All @@ -19,7 +20,7 @@ func (c *calcium) allocMemoryPodResource(opts *types.DeployOptions) ([]types.Nod
}
defer lock.Unlock()

cpuandmem, _, err := c.getCPUAndMem(opts.Podname, opts.Nodename, opts.NodeLabels, 1.0)
cpuandmem, _, err := c.getCPUAndMem(opts.Podname, opts.Nodename, opts.NodeLabels)
if err != nil {
return nil, err
}
Expand All @@ -32,6 +33,14 @@ func (c *calcium) allocMemoryPodResource(opts *types.DeployOptions) ([]types.Nod
return nil, err
}

// 每台机器都允许部署所需容量容器
if opts.RawResource {
for i := range nodesInfo {
nodesInfo[i].Capacity = opts.Count
}
return complexscheduler.CommunismDivisionPlan(nodesInfo, opts.Count, opts.Count*len(nodesInfo))
}

cpuRate := int64(opts.CPUQuota * float64(utils.CpuPeriodBase))
log.Debugf("[allocMemoryPodResource] Input opts.CPUQuota: %f, equal CPURate %d", opts.CPUQuota, cpuRate)
sort.Slice(nodesInfo, func(i, j int) bool { return nodesInfo[i].MemCap < nodesInfo[j].MemCap })
Expand Down Expand Up @@ -64,7 +73,7 @@ func (c *calcium) allocCPUPodResource(opts *types.DeployOptions) (map[string][]t
}
defer lock.Unlock()

cpuandmem, nodes, err := c.getCPUAndMem(opts.Podname, opts.Nodename, opts.NodeLabels, opts.CPUQuota)
cpuandmem, nodes, err := c.getCPUAndMem(opts.Podname, opts.Nodename, opts.NodeLabels)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -121,7 +130,7 @@ func filterNode(node *types.Node, labels map[string]string) bool {
return true
}

func (c *calcium) getCPUAndMem(podname, nodename string, labels map[string]string, quota float64) (map[string]types.CPUAndMem, []*types.Node, error) {
func (c *calcium) getCPUAndMem(podname, nodename string, labels map[string]string) (map[string]types.CPUAndMem, []*types.Node, error) {
result := make(map[string]types.CPUAndMem)
var nodes []*types.Node
var err error
Expand Down
Loading

0 comments on commit 906b2d5

Please sign in to comment.