diff --git a/cluster/calcium/control.go b/cluster/calcium/control.go index ceb8abd41..24efe7aeb 100644 --- a/cluster/calcium/control.go +++ b/cluster/calcium/control.go @@ -2,12 +2,10 @@ package calcium import ( "context" - "fmt" "sync" enginetypes "github.com/docker/docker/api/types" "github.com/projecteru2/core/cluster" - "github.com/projecteru2/core/lock" "github.com/projecteru2/core/types" log "github.com/sirupsen/logrus" ) @@ -134,25 +132,3 @@ func (c *Calcium) doRemoveContainer(ctx context.Context, container *types.Contai return c.store.RemoveContainer(ctx, container) } - -func (c *Calcium) doLockContainer(ctx context.Context, container *types.Container) (*types.Container, enginetypes.ContainerJSON, lock.DistributedLock, error) { - lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, container.ID), int(c.config.GlobalTimeout.Seconds())) - if err != nil { - return container, enginetypes.ContainerJSON{}, nil, err - } - log.Debugf("[doLockContainer] Container %s locked", container.ID) - // 确保是有这个容器的 - containerJSON, err := container.Inspect(ctx) - if err != nil { - lock.Unlock(ctx) - return container, enginetypes.ContainerJSON{}, nil, err - } - // 更新容器元信息 - // 可能在等锁的过程中被 realloc 了 - rContainer, err := c.store.GetContainer(ctx, container.ID) - if err != nil { - lock.Unlock(ctx) - return container, enginetypes.ContainerJSON{}, nil, err - } - return rContainer, containerJSON, lock, nil -} diff --git a/cluster/calcium/helper.go b/cluster/calcium/helper.go index 63f44c819..bc79b6146 100644 --- a/cluster/calcium/helper.go +++ b/cluster/calcium/helper.go @@ -19,25 +19,12 @@ import ( engineapi "github.com/docker/docker/client" "github.com/docker/docker/registry" "github.com/projecteru2/core/cluster" - "github.com/projecteru2/core/lock" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) -// Lock is lock for calcium -func (c *Calcium) Lock(ctx context.Context, name string, timeout int) (lock.DistributedLock, error) { - lock, err := c.store.CreateLock(name, timeout) - if err != nil { - return nil, err - } - if err = lock.Lock(ctx); err != nil { - return nil, err - } - return lock, nil -} - func makeResourceSetting(cpu float64, memory int64, cpuMap types.CPUMap, softlimit bool) enginecontainer.Resources { resource := enginecontainer.Resources{} if cpu > 0 { diff --git a/cluster/calcium/lock.go b/cluster/calcium/lock.go new file mode 100644 index 000000000..b959e688b --- /dev/null +++ b/cluster/calcium/lock.go @@ -0,0 +1,89 @@ +package calcium + +import ( + "context" + "fmt" + + enginetypes "github.com/docker/docker/api/types" + "github.com/projecteru2/core/cluster" + "github.com/projecteru2/core/lock" + "github.com/projecteru2/core/types" + log "github.com/sirupsen/logrus" +) + +// Lock is lock for calcium +func (c *Calcium) Lock(ctx context.Context, name string, timeout int) (lock.DistributedLock, error) { + lock, err := c.store.CreateLock(name, timeout) + if err != nil { + return nil, err + } + if err = lock.Lock(ctx); err != nil { + return nil, err + } + return lock, nil +} + +// UnlockAll unlock all locks +func (c *Calcium) UnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) { + for _, lock := range locks { + if err := lock.Unlock(ctx); err != nil { + log.Errorf("[UnlockAll] Unlock failed %v", err) + } + } +} + +// LockAndGetContainers lock and get containers +func (c *Calcium) LockAndGetContainers(ctx context.Context, IDs []string) (map[string]*types.Container, map[string]enginetypes.ContainerJSON, map[string]lock.DistributedLock, error) { + containers := map[string]*types.Container{} + containerJSONs := map[string]enginetypes.ContainerJSON{} + locks := map[string]lock.DistributedLock{} + for _, ID := range IDs { + container, containerJSON, lock, err := c.LockAndGetContainer(ctx, ID) + if err != nil { + c.UnlockAll(ctx, locks) + return nil, nil, nil, err + } + containers[ID] = container + containerJSONs[ID] = containerJSON + locks[ID] = lock + } + return containers, containerJSONs, locks, nil +} + +// LockAndGetContainer lock and get container +func (c *Calcium) LockAndGetContainer(ctx context.Context, ID string) (*types.Container, enginetypes.ContainerJSON, lock.DistributedLock, error) { + lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, ID), c.config.LockTimeout) + if err != nil { + return nil, enginetypes.ContainerJSON{}, nil, err + } + log.Debugf("[LockAndGetContainer] Container %s locked", ID) + // Get container + container, err := c.store.GetContainer(ctx, ID) + if err != nil { + lock.Unlock(ctx) + return nil, enginetypes.ContainerJSON{}, nil, err + } + // 确保是有这个容器的 + containerJSON, err := container.Inspect(ctx) + if err != nil { + lock.Unlock(ctx) + return nil, enginetypes.ContainerJSON{}, nil, err + } + return container, containerJSON, lock, nil +} + +// LockAndGetNode lock and get node +func (c *Calcium) LockAndGetNode(ctx context.Context, podname, nodename string) (*types.Node, lock.DistributedLock, error) { + lock, err := c.Lock(ctx, fmt.Sprintf(cluster.NodeLock, podname, nodename), c.config.LockTimeout) + if err != nil { + return nil, nil, err + } + log.Debugf("[LockAndGetNode] Node %s locked", nodename) + // Get node + node, err := c.GetNode(ctx, podname, nodename) + if err != nil { + lock.Unlock(ctx) + return nil, nil, err + } + return node, lock, nil +} diff --git a/cluster/calcium/realloc.go b/cluster/calcium/realloc.go index 8c7934195..84d0c760b 100644 --- a/cluster/calcium/realloc.go +++ b/cluster/calcium/realloc.go @@ -4,6 +4,8 @@ import ( "context" "sync" + "github.com/sanity-io/litter" + "github.com/projecteru2/core/lock" enginecontainer "github.com/docker/docker/api/types/container" @@ -14,62 +16,69 @@ import ( // ReallocResource allow realloc container resource func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64, mem int64) (chan *types.ReallocResourceMessage, error) { - containers, err := c.store.GetContainers(ctx, IDs) - if err != nil { - return nil, err - } - ch := make(chan *types.ReallocResourceMessage) go func() { defer close(ch) - // Pod-Node-Containers 三元组 + // Container objs and their locks + containers, _, containerLocks, err := c.LockAndGetContainers(ctx, IDs) + if err != nil { + log.Errorf("[ReallocResource] Lock and get containers failed %v", err) + return + } + defer c.UnlockAll(ctx, containerLocks) + // Pod-Node-Containers containersInfo := map[*types.Pod]NodeContainers{} - // container locked - containerLocks := map[string]lock.DistributedLock{} - defer func() { - for ID, lock := range containerLocks { - log.Debugf("[ReallocResource] Container %s unlocked", ID) - lock.Unlock(ctx) - } - }() - for _, container := range containers { - container, _, lock, err := c.doLockContainer(ctx, container) - if err != nil { - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} - continue - } - node := container.Node - pod, err := c.store.GetPod(ctx, container.Podname) - if err != nil { - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} - lock.Unlock(ctx) - continue - } - containerLocks[container.ID] = lock + // Pod cache + podCache := map[string]*types.Pod{} + // Node locks + nodeLocks := map[string]lock.DistributedLock{} + defer c.UnlockAll(ctx, nodeLocks) + // Node cache + nodeCache := map[string]*types.Node{} - if _, ok := containersInfo[pod]; !ok { + for _, container := range containers { + pod, ok := podCache[container.Podname] + if !ok { + pod, err := c.store.GetPod(ctx, container.Podname) + if err != nil { + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + continue + } + podCache[container.Podname] = pod containersInfo[pod] = NodeContainers{} } - if _, ok := containersInfo[pod][node]; !ok { - containersInfo[pod][node] = []*types.Container{} + // 没锁过 + if _, ok := nodeLocks[container.Nodename]; !ok { + node, nodeLock, err := c.LockAndGetNode(ctx, container.Podname, container.Nodename) + if err != nil { + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + continue + } + nodeLocks[container.Nodename] = nodeLock + containersInfo[pod][node] = []*types.Container{container} + nodeCache[container.Nodename] = node + continue } + // 锁过 + node := nodeCache[container.Nodename] containersInfo[pod][node] = append(containersInfo[pod][node], container) } + wg := sync.WaitGroup{} wg.Add(len(containersInfo)) // deal with normal container for pod, nodeContainers := range containersInfo { switch pod.Favor { - case scheduler.CPU_PRIOR: - go func(pod *types.Pod, nodeContainers NodeContainers) { - defer wg.Done() - c.reallocContainersWithCPUPrior(ctx, ch, pod, nodeContainers, cpu, mem) - }(pod, nodeContainers) case scheduler.MEMORY_PRIOR: - go func(pod *types.Pod, nodeContainers NodeContainers) { + go func(NodeContainers) { defer wg.Done() - c.reallocContainerWithMemoryPrior(ctx, ch, pod, nodeContainers, cpu, mem) - }(pod, nodeContainers) + c.reallocContainerWithMemoryPrior(ctx, ch, nodeContainers, cpu, mem) + }(nodeContainers) + case scheduler.CPU_PRIOR: + go func(nodeContainers NodeContainers) { + defer wg.Done() + c.reallocContainersWithCPUPrior(ctx, ch, nodeContainers, cpu, mem) + }(nodeContainers) default: log.Errorf("[ReallocResource] %v not support yet", pod.Favor) go func(nodeContainers NodeContainers) { @@ -90,13 +99,12 @@ func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64 func (c *Calcium) reallocContainerWithMemoryPrior( ctx context.Context, ch chan *types.ReallocResourceMessage, - pod *types.Pod, nodeContainers NodeContainers, cpu float64, memory int64) { // 不考虑 memory < 0 对于系统而言,这时候 realloc 只不过使得 node 记录的内存 > 容器拥有内存总和,并不会 OOM if memory > 0 { - if err := c.reallocNodesMemory(ctx, pod.Name, nodeContainers, memory); err != nil { + if err := c.reallocNodesMemory(ctx, nodeContainers, memory); err != nil { log.Errorf("[reallocContainerWithMemoryPrior] realloc memory failed %v", err) for _, containers := range nodeContainers { for _, container := range containers { @@ -107,26 +115,16 @@ func (c *Calcium) reallocContainerWithMemoryPrior( } } - // 不并发操作了 - for node, containers := range nodeContainers { - c.doUpdateContainerWithMemoryPrior(ctx, ch, pod.Name, node, containers, cpu, memory) - } + c.doUpdateContainerWithMemoryPrior(ctx, ch, nodeContainers, cpu, memory) } // 只考虑增量 memory 的消耗 -func (c *Calcium) reallocNodesMemory(ctx context.Context, podname string, nodeContainers NodeContainers, memory int64) error { - lock, err := c.Lock(ctx, podname, c.config.LockTimeout) - if err != nil { - return err - } - defer lock.Unlock(ctx) +func (c *Calcium) reallocNodesMemory(ctx context.Context, nodeContainers NodeContainers, memory int64) error { + // 只 check 增量情况下是否满足所需 for node, containers := range nodeContainers { if cap := int(node.MemCap / memory); cap < len(containers) { return types.NewDetailedErr(types.ErrInsufficientRes, node.Name) } - if err := c.store.UpdateNodeResource(ctx, podname, node.Name, types.CPUMap{}, int64(len(containers))*memory, "-"); err != nil { - return err - } } return nil } @@ -134,58 +132,54 @@ func (c *Calcium) reallocNodesMemory(ctx context.Context, podname string, nodeCo func (c *Calcium) doUpdateContainerWithMemoryPrior( ctx context.Context, ch chan *types.ReallocResourceMessage, - podname string, - node *types.Node, - containers []*types.Container, + nodeContainers NodeContainers, cpu float64, memory int64) { - - for _, container := range containers { - newCPU := container.Quota + cpu - newMemory := container.Memory + memory - // 内存不能低于 4MB - if newCPU <= 0 || newMemory <= minMemory { - log.Errorf("[doUpdateContainerWithMemoryPrior] new resource invaild %s, %f, %d", container.ID, newCPU, newMemory) - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} - continue - } - log.Debugf("[doUpdateContainerWithMemoryPrior] cpu: %f, mem: %d", newCPU, newMemory) - // CPUQuota not cpu - newResource := makeResourceSetting(newCPU, newMemory, nil, container.SoftLimit) - updateConfig := enginecontainer.UpdateConfig{Resources: newResource} - if _, err := node.Engine.ContainerUpdate(ctx, container.ID, updateConfig); err != nil { - log.Errorf("[doUpdateContainerWithMemoryPrior] update container failed %v, %s", err, container.ID) - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} - // 如果是增加内存,失败的时候应该把内存还回去 + for node, containers := range nodeContainers { + for _, container := range containers { + newCPU := container.Quota + cpu + newMemory := container.Memory + memory + // 内存不能低于 4MB + if newCPU <= 0 || newMemory <= minMemory { + log.Errorf("[doUpdateContainerWithMemoryPrior] new resource invaild %s, %f, %d", container.ID, newCPU, newMemory) + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + continue + } + log.Debugf("[doUpdateContainerWithMemoryPrior] container %s: cpu: %f, mem: %d", container.ID, newCPU, newMemory) + // CPUQuota not cpu + newResource := makeResourceSetting(newCPU, newMemory, nil, container.SoftLimit) + updateConfig := enginecontainer.UpdateConfig{Resources: newResource} + if _, err := node.Engine.ContainerUpdate(ctx, container.ID, updateConfig); err != nil { + log.Errorf("[doUpdateContainerWithMemoryPrior] update container failed %v, %s", err, container.ID) + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + continue + } + // 成功的时候应该记录内存变动 if memory > 0 { - if err := c.store.UpdateNodeResource(ctx, podname, node.Name, types.CPUMap{}, memory, "+"); err != nil { - log.Errorf("[doUpdateContainerWithMemoryPrior] failed to set mem back %s", container.ID) - } + node.MemCap -= memory + } else { + node.MemCap += memory } - continue - } - // 如果是要降低内存,当执行成功的时候需要把内存还回去 - if memory < 0 { - if err := c.store.UpdateNodeResource(ctx, podname, node.Name, types.CPUMap{}, -memory, "+"); err != nil { - log.Errorf("[doUpdateContainerWithMemoryPrior] failed to set mem back %s", container.ID) + // 更新容器元信息 + container.Quota = newCPU + container.Memory = newMemory + if err := c.store.UpdateContainer(ctx, container); err != nil { + log.Warnf("[doUpdateContainerWithMemoryPrior] update container %s failed %v", container.ID, err) + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + continue } + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true} } - - container.Quota = newCPU - container.Memory = newMemory - if err := c.store.UpdateContainer(ctx, container); err != nil { - log.Errorf("[doUpdateContainerWithMemoryPrior] update container meta failed %v", err) - // 立即中断 - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + if err := c.store.UpdateNode(ctx, node); err != nil { + log.Errorf("[doUpdateContainerWithMemoryPrior] update node %s failed %s", node.Name, err) + litter.Dump(node) return } - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true} } } func (c *Calcium) reallocContainersWithCPUPrior( ctx context.Context, ch chan *types.ReallocResourceMessage, - pod *types.Pod, nodeContainers NodeContainers, cpu float64, memory int64) { @@ -212,7 +206,7 @@ func (c *Calcium) reallocContainersWithCPUPrior( } } - cpuMemNodesMap, err := c.reallocNodesCPUMem(ctx, pod.Name, cpuMemNodeContainers) + cpuMemNodesMap, err := c.reallocNodesCPUMem(ctx, cpuMemNodeContainers) if err != nil { log.Errorf("[reallocContainersWithCPUPrior] realloc cpu resource failed %v", err) for _, memNodeMap := range cpuMemNodeContainers { @@ -227,39 +221,22 @@ func (c *Calcium) reallocContainersWithCPUPrior( return } - // 不并发操作了 - for newCPU, memNodeResult := range cpuMemNodesMap { - c.doReallocContainersWithCPUPrior(ctx, ch, newCPU, pod.Name, memNodeResult, cpuMemNodeContainers[newCPU]) - } + c.doReallocContainersWithCPUPrior(ctx, ch, cpuMemNodesMap, cpuMemNodeContainers) } func (c *Calcium) reallocNodesCPUMem( ctx context.Context, - podname string, cpuMemNodeContainersInfo CPUMemNodeContainers, ) (CPUMemNodeContainersMap, error) { - - lock, err := c.Lock(ctx, podname, c.config.LockTimeout) - if err != nil { - return nil, err - } - defer lock.Unlock(ctx) - - // TODO too slow + // 不做实际的 node 分配,反正已经锁住了,只计算可能性 cpuMemNodesMap := CPUMemNodeContainersMap{} for requireCPU, memNodesContainers := range cpuMemNodeContainersInfo { for requireMemory, nodesContainers := range memNodesContainers { for node, containers := range nodesContainers { // 把记录的 CPU 还回去,变成新的可用资源 // 把记录的 Mem 还回去,变成新的可用资源 - // 即便有并发操作,不影响 Create container 操作 - // 最坏情况就是 CPU/MEM 重叠了,可以外部纠正 for _, container := range containers { - // 更新 ETCD 数据 - if err := c.store.UpdateNodeResource(ctx, podname, node.Name, container.CPU, container.Memory, "+"); err != nil { - return nil, err - } - // 更新 node 值 + // 不更新 etcd,内存计算 node.CPU.Add(container.CPU) node.MemCap += container.Memory } @@ -279,12 +256,6 @@ func (c *Calcium) reallocNodesCPUMem( // 重新计算需求 nodesInfo, nodeCPUPlans, total, err := c.scheduler.SelectCPUNodes(nodesInfo, requireCPU, requireMemory) if err != nil { - c.resetContainerResource(ctx, podname, node.Name, containers) - return nil, err - } - nodesInfo, err = c.scheduler.EachDivision(nodesInfo, need, 0) - if err != nil { - c.resetContainerResource(ctx, podname, node.Name, containers) return nil, err } // 这里只有1个节点,肯定会出现1个节点的解决方案 @@ -292,24 +263,13 @@ func (c *Calcium) reallocNodesCPUMem( return nil, types.ErrInsufficientRes } - cpuCost := types.CPUMap{} - memoryCost := requireMemory * int64(need) - cpuList := nodeCPUPlans[node.Name][:need] - for _, cpu := range cpuList { - cpuCost.Add(cpu) - } - - // 扣掉资源 - if err := c.store.UpdateNodeResource(ctx, podname, node.Name, cpuCost, memoryCost, "-"); err != nil { - return nil, err - } if _, ok := cpuMemNodesMap[requireCPU]; !ok { cpuMemNodesMap[requireCPU] = map[int64]NodeCPUMap{} } if _, ok := cpuMemNodesMap[requireCPU][requireMemory]; !ok { cpuMemNodesMap[requireCPU][requireMemory] = NodeCPUMap{} } - cpuMemNodesMap[requireCPU][requireMemory][node] = cpuList + cpuMemNodesMap[requireCPU][requireMemory][node] = nodeCPUPlans[node.Name][:need] } } } @@ -319,46 +279,42 @@ func (c *Calcium) reallocNodesCPUMem( func (c *Calcium) doReallocContainersWithCPUPrior( ctx context.Context, ch chan *types.ReallocResourceMessage, - quota float64, - podname string, - memNodeResult map[int64]NodeCPUMap, - memNodeContainers map[int64]NodeContainers, + cpuMemNodesMap CPUMemNodeContainersMap, + cpuMemNodeContainers CPUMemNodeContainers, ) { - - for requireMemory, nodesCPUResult := range memNodeResult { - nodeContainers := memNodeContainers[requireMemory] - for node, cpuset := range nodesCPUResult { - containers := nodeContainers[node] - for index, container := range containers { - cpuPlan := cpuset[index] - resource := makeResourceSetting(quota, requireMemory, cpuPlan, container.SoftLimit) - updateConfig := enginecontainer.UpdateConfig{Resources: resource} - if _, err := node.Engine.ContainerUpdate(ctx, container.ID, updateConfig); err != nil { - log.Errorf("[doReallocContainersWithCPUPrior] update container failed %v", err) - // TODO 这里理论上是可以恢复 CPU 占用表的,一来我们知道新的占用是怎样,二来我们也晓得老的占用是啥样 - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + for newCPU, memNodeResult := range cpuMemNodesMap { + for newMem, nodesCPUResult := range memNodeResult { + nodeContainers := cpuMemNodeContainers[newCPU][newMem] + for node, cpuset := range nodesCPUResult { + containers := nodeContainers[node] + for index, container := range containers { + cpuPlan := cpuset[index] + resource := makeResourceSetting(newCPU, newMem, cpuPlan, container.SoftLimit) + updateConfig := enginecontainer.UpdateConfig{Resources: resource} + if _, err := node.Engine.ContainerUpdate(ctx, container.ID, updateConfig); err != nil { + log.Errorf("[doReallocContainersWithCPUPrior] update container failed %v", err) + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + continue + } + // 成功的时候应该记录变动 + node.CPU.Sub(cpuPlan) + node.MemCap -= newMem + container.CPU = cpuPlan + container.Quota = newCPU + container.Memory = newMem + if err := c.store.UpdateContainer(ctx, container); err != nil { + log.Warnf("[doReallocContainersWithCPUPrior] update container %s failed %v", container.ID, err) + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + continue + } + ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true} } - - container.CPU = cpuPlan - container.Quota = quota - container.Memory = requireMemory - if err := c.store.UpdateContainer(ctx, container); err != nil { - log.Errorf("[doReallocContainersWithCPUPrior] update container meta failed %v", err) - // 立即中断 - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false} + if err := c.store.UpdateNode(ctx, node); err != nil { + log.Errorf("[doReallocContainersWithCPUPrior] update node %s failed %s", node.Name, err) + litter.Dump(node) return } - ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: true} } } } } - -func (c *Calcium) resetContainerResource(ctx context.Context, podname, nodename string, containers []*types.Container) error { - for _, container := range containers { - if err := c.store.UpdateNodeResource(ctx, podname, nodename, container.CPU, container.Memory, "-"); err != nil { - return err - } - } - return nil -} diff --git a/cluster/cluster.go b/cluster/cluster.go index 914d43190..6247d82f5 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -36,6 +36,8 @@ const ( ContainerRestart = "restart" // ContainerLock for lock container ContainerLock = "clock_%s" + // NodeLock for lock node + NodeLock = "cnode_%s_%s" ) // Cluster define all interface diff --git a/store/etcdv3/container.go b/store/etcdv3/container.go index d07edb883..9ac54dcd4 100644 --- a/store/etcdv3/container.go +++ b/store/etcdv3/container.go @@ -104,7 +104,7 @@ func (m *Mercury) GetContainer(ctx context.Context, ID string) (*types.Container return nil, err } - if c, err = m.bindNodeEngine(ctx, c); err != nil { + if c, err = m.bindEngine(ctx, c); err != nil { return nil, err } @@ -170,7 +170,7 @@ func (m *Mercury) ListNodeContainers(ctx context.Context, nodename string) ([]*t return []*types.Container{}, err } - if c, err = m.bindNodeEngine(ctx, c); err != nil { + if c, err = m.bindEngine(ctx, c); err != nil { return []*types.Container{}, err } @@ -216,13 +216,12 @@ func (m *Mercury) WatchDeployStatus(ctx context.Context, appname, entrypoint, no return ch } -func (m *Mercury) bindNodeEngine(ctx context.Context, container *types.Container) (*types.Container, error) { +func (m *Mercury) bindEngine(ctx context.Context, container *types.Container) (*types.Container, error) { node, err := m.GetNode(ctx, container.Podname, container.Nodename) if err != nil { return nil, err } container.Engine = node.Engine - container.Node = node return container, nil } diff --git a/store/etcdv3/node.go b/store/etcdv3/node.go index 32056caf0..174d0516c 100644 --- a/store/etcdv3/node.go +++ b/store/etcdv3/node.go @@ -8,6 +8,8 @@ import ( "strconv" "strings" + "github.com/projecteru2/core/store" + "github.com/coreos/etcd/clientv3" engineapi "github.com/docker/docker/client" "github.com/projecteru2/core/metrics" @@ -223,20 +225,6 @@ func (m *Mercury) GetAllNodes(ctx context.Context) ([]*types.Node, error) { // UpdateNode update a node, save it to etcd // storage path in etcd is `/pod/nodes/:podname/:nodename` func (m *Mercury) UpdateNode(ctx context.Context, node *types.Node) error { - lock, err := m.CreateLock(fmt.Sprintf("%s_%s", node.Podname, node.Name), m.config.LockTimeout) - if err != nil { - return err - } - - if err := lock.Lock(ctx); err != nil { - return err - } - defer lock.Unlock(ctx) - - return m.doUpdateNode(ctx, node) -} - -func (m *Mercury) doUpdateNode(ctx context.Context, node *types.Node) error { key := fmt.Sprintf(nodeInfoKey, node.Podname, node.Name) bytes, err := json.Marshal(node) if err != nil { @@ -250,36 +238,20 @@ func (m *Mercury) doUpdateNode(ctx context.Context, node *types.Node) error { } // UpdateNodeResource update cpu and mem on a node, either add or substract -// need to lock -func (m *Mercury) UpdateNodeResource(ctx context.Context, podname, nodename string, cpu types.CPUMap, mem int64, action string) error { - lock, err := m.CreateLock(fmt.Sprintf("%s_%s", podname, nodename), m.config.LockTimeout) - if err != nil { - return err - } - - if err := lock.Lock(ctx); err != nil { - return err - } - defer lock.Unlock(ctx) - - node, err := m.GetNode(ctx, podname, nodename) - if err != nil { - return err - } - - if action == "add" || action == "+" { +func (m *Mercury) UpdateNodeResource(ctx context.Context, node *types.Node, cpu types.CPUMap, mem int64, action string) error { + switch action { + case store.ActionIncr: node.CPU.Add(cpu) node.MemCap += mem - } else if action == "sub" || action == "-" { + case store.ActionDecr: node.CPU.Sub(cpu) node.MemCap -= mem - } else { + default: return types.ErrUnknownControlType } - err = m.doUpdateNode(ctx, node) go metrics.Client.SendNodeInfo(node) - return err + return m.UpdateNode(ctx, node) } func (m *Mercury) makeDockerClient(ctx context.Context, podname, nodename, endpoint string, force bool) (engineapi.APIClient, error) { diff --git a/store/store.go b/store/store.go index d533791b9..3ae400d01 100644 --- a/store/store.go +++ b/store/store.go @@ -9,9 +9,13 @@ import ( const ( // PUTEVENT for put event - PUTEVENT = "PUT" + PutEvent = "PUT" // DELETEEVENT for delete event - DELETEEVENT = "DELETE" + DeleteEvent = "DELETE" + // ActionIncr + ActionIncr = "+" + // ActionDecres + ActionDecr = "-" ) //Store store eru data @@ -30,7 +34,7 @@ type Store interface { GetNodesByPod(ctx context.Context, podname string) ([]*types.Node, error) GetAllNodes(ctx context.Context) ([]*types.Node, error) UpdateNode(ctx context.Context, node *types.Node) error - UpdateNodeResource(ctx context.Context, podname, nodename string, cpu types.CPUMap, mem int64, action string) error + UpdateNodeResource(ctx context.Context, node *types.Node, cpu types.CPUMap, mem int64, action string) error // container AddContainer(ctx context.Context, container *types.Container) error diff --git a/types/container.go b/types/container.go index 31ef89ce1..bf7a26d75 100644 --- a/types/container.go +++ b/types/container.go @@ -23,7 +23,6 @@ type Container struct { Privileged bool `json:"privileged"` SoftLimit bool `json:"softlimit"` Engine engineapi.APIClient `json:"-"` - Node *Node `json:"-"` } // Inspect a container