Skip to content

Commit

Permalink
support realloc raw resource container
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Mar 5, 2018
1 parent 906b2d5 commit 76c781e
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,11 @@ func (c *calcium) ReallocResource(ids []string, cpu float64, mem int64) (chan *t
containersInfo := map[*types.Pod]NodeContainers{}
// Pod-cpu-node-containers 四元组
cpuContainersInfo := map[*types.Pod]CPUNodeContainers{}
// Raw resource container Node-Containers
rawResourceContainers := NodeContainers{}
nodeCache := map[string]*types.Node{}

for _, container := range containers {
if container.RawResource {
//TODO not support yet
return nil, fmt.Errorf("Realloc raw resource container not support yet")
}

if _, ok := nodeCache[container.Nodename]; !ok {
node, err := c.GetNode(container.Podname, container.Nodename)
if err != nil {
Expand All @@ -45,6 +42,14 @@ func (c *calcium) ReallocResource(ids []string, cpu float64, mem int64) (chan *t
}
node := nodeCache[container.Nodename]

if container.RawResource {
if _, ok := rawResourceContainers[node]; !ok {
rawResourceContainers[node] = []*types.Container{}
}
rawResourceContainers[node] = append(rawResourceContainers[node], container)
continue
}

pod, err := c.store.GetPod(container.Podname)
if err != nil {
return nil, err
Expand Down Expand Up @@ -79,7 +84,17 @@ func (c *calcium) ReallocResource(ids []string, cpu float64, mem int64) (chan *t
go func() {
defer close(ch)
wg := sync.WaitGroup{}
wg.Add(len(containersInfo))
wg.Add(len(containersInfo) + len(rawResourceContainers))

// deal with container with raw resource
for node, containers := range rawResourceContainers {
go func(node *types.Node, containers []*types.Container) {
defer wg.Done()
c.doUpdateContainerWithMemoryPrior(ch, node.Podname, node, containers, cpu, mem)
}(node, containers)
}

// deal with normal container
for pod, nodeContainers := range containersInfo {
if pod.Favor == scheduler.CPU_PRIOR {
nodeCPUContainersInfo := cpuContainersInfo[pod]
Expand Down Expand Up @@ -173,15 +188,15 @@ func (c *calcium) doUpdateContainerWithMemoryPrior(
log.Errorf("[doUpdateContainerWithMemoryPrior] update container failed %v, %s", err, containerJSON.ID)
ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false}
// 如果是增加内存,失败的时候应该把内存还回去
if memory > 0 {
if memory > 0 && !container.RawResource {
if err := c.store.UpdateNodeMem(podname, node.Name, memory, "+"); err != nil {
log.Errorf("[doUpdateContainerWithMemoryPrior] failed to set mem back %s", containerJSON.ID)
}
}
continue
}
// 如果是要降低内存,当执行成功的时候需要把内存还回去
if memory < 0 {
if memory < 0 && !container.RawResource {
if err := c.store.UpdateNodeMem(podname, node.Name, -memory, "+"); err != nil {
log.Errorf("[doUpdateContainerWithMemoryPrior] failed to set mem back %s", containerJSON.ID)
}
Expand Down

0 comments on commit 76c781e

Please sign in to comment.