Skip to content

Commit

Permalink
refactor storage interface with context
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jun 28, 2018
1 parent 152a0fe commit 455e478
Show file tree
Hide file tree
Showing 18 changed files with 455 additions and 443 deletions.
8 changes: 4 additions & 4 deletions cluster/calcium/create_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (

//CreateContainer use options to create containers
func (c *Calcium) CreateContainer(opts *types.DeployOptions) (chan *types.CreateContainerMessage, error) {
pod, err := c.store.GetPod(opts.Podname)
pod, err := c.store.GetPod(context.Background(), opts.Podname)
if err != nil {
log.Errorf("[CreateContainer] Error during GetPod for %s: %v", opts.Podname, err)
return nil, err
Expand Down Expand Up @@ -87,7 +87,7 @@ func (c *Calcium) doCreateContainerWithMemoryPrior(nodeInfo types.NodeInfo, opts
for i := 0; i < nodeInfo.Deploy; i++ {
ms[i] = &types.CreateContainerMessage{Error: err}
if !opts.RawResource {
if err := c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, opts.Memory, "+"); err != nil {
if err := c.store.UpdateNodeMem(context.Background(), opts.Podname, nodeInfo.Name, opts.Memory, "+"); err != nil {
log.Errorf("[doCreateContainerWithMemoryPrior] reset node memory failed %v", err)
}
}
Expand Down Expand Up @@ -153,7 +153,7 @@ func (c *Calcium) doCreateContainerWithCPUPrior(nodeName string, cpuMap []types.
log.Errorf("[doCreateContainerWithCPUPrior] Get and prepare node error %v", err)
for i := 0; i < deployCount; i++ {
ms[i] = &types.CreateContainerMessage{Error: err}
if err := c.store.UpdateNodeCPU(opts.Podname, nodeName, cpuMap[i], "+"); err != nil {
if err := c.store.UpdateNodeCPU(context.Background(), opts.Podname, nodeName, cpuMap[i], "+"); err != nil {
log.Errorf("[doCreateContainerWithCPUPrior] update node CPU failed %v", err)
}
}
Expand Down Expand Up @@ -390,7 +390,7 @@ func (c *Calcium) createAndStartContainer(
container.ID = containerCreated.ID
createContainerMessage.ContainerID = container.ID

if err = c.store.AddContainer(container); err != nil {
if err = c.store.AddContainer(context.Background(), container); err != nil {
createContainerMessage.Error = err
return createContainerMessage
}
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/create_container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
func TestPullImage(t *testing.T) {
initMockConfig()

nodes, err := mockc.store.GetAllNodes()
nodes, err := mockc.store.GetAllNodes(context.Background())
if err != nil || len(nodes) == 0 {
t.Fatal(err)
}
Expand Down Expand Up @@ -66,7 +66,7 @@ func TestClean(t *testing.T) {
initMockConfig()

// delete pod, which will fail because there are remaining nodes
err := mockc.store.RemovePod(podname)
err := mockc.store.RemovePod(context.Background(), podname)
assert.Error(t, err)
assert.Contains(t, err.Error(), "still has nodes")
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (c *Calcium) Lock(name string, timeout int) (lock.DistributedLock, error) {
if err != nil {
return nil, err
}
if err = lock.Lock(); err != nil {
if err = lock.Lock(context.Background()); err != nil {
return nil, err
}
return lock, nil
Expand Down
40 changes: 21 additions & 19 deletions cluster/calcium/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,72 +4,74 @@ package calcium
// All these functions are meta data related.

import (
"context"

"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)

//ListPods show pods
func (c *Calcium) ListPods() ([]*types.Pod, error) {
return c.store.GetAllPods()
return c.store.GetAllPods(context.Background())
}

//AddPod add pod
func (c *Calcium) AddPod(podname, favor, desc string) (*types.Pod, error) {
return c.store.AddPod(podname, favor, desc)
return c.store.AddPod(context.Background(), podname, favor, desc)
}

//RemovePod remove pod
func (c *Calcium) RemovePod(podname string) error {
return c.store.RemovePod(podname)
return c.store.RemovePod(context.Background(), podname)
}

//GetPod get one pod
func (c *Calcium) GetPod(podname string) (*types.Pod, error) {
return c.store.GetPod(podname)
return c.store.GetPod(context.Background(), podname)
}

//AddNode add a node in pod
func (c *Calcium) AddNode(nodename, endpoint, podname, ca, cert, key string, cpu int, share, memory int64, labels map[string]string) (*types.Node, error) {
return c.store.AddNode(nodename, endpoint, podname, ca, cert, key, cpu, share, memory, labels)
return c.store.AddNode(context.Background(), nodename, endpoint, podname, ca, cert, key, cpu, share, memory, labels)
}

//GetNode get node
func (c *Calcium) GetNode(podname, nodename string) (*types.Node, error) {
return c.store.GetNode(podname, nodename)
return c.store.GetNode(context.Background(), podname, nodename)
}

//GetNodeByName get node by name
func (c *Calcium) GetNodeByName(nodename string) (*types.Node, error) {
return c.store.GetNodeByName(nodename)
return c.store.GetNodeByName(context.Background(), nodename)
}

//SetNodeAvailable set node available or not
func (c *Calcium) SetNodeAvailable(podname, nodename string, available bool) (*types.Node, error) {
n, err := c.store.GetNode(podname, nodename)
n, err := c.store.GetNode(context.Background(), podname, nodename)
if err != nil {
return nil, err
}
n.Available = available
if err := c.store.UpdateNode(n); err != nil {
if err := c.store.UpdateNode(context.Background(), n); err != nil {
return nil, err
}
return n, nil
}

//RemoveNode remove a node
func (c *Calcium) RemoveNode(nodename, podname string) (*types.Pod, error) {
n, err := c.store.GetNode(podname, nodename)
n, err := c.store.GetNode(context.Background(), podname, nodename)
if err != nil {
return nil, err
}
c.store.DeleteNode(n)
return c.store.GetPod(podname)
c.store.DeleteNode(context.Background(), n)
return c.store.GetPod(context.Background(), podname)
}

//ListPodNodes list nodes belong to pod
func (c *Calcium) ListPodNodes(podname string, all bool) ([]*types.Node, error) {
var nodes []*types.Node
candidates, err := c.store.GetNodesByPod(podname)
candidates, err := c.store.GetNodesByPod(context.Background(), podname)
if err != nil {
log.Debugf("[ListPodNodes] Error during ListPodNodes from %s: %v", podname, err)
return nodes, err
Expand All @@ -83,21 +85,21 @@ func (c *Calcium) ListPodNodes(podname string, all bool) ([]*types.Node, error)
}

//GetContainer get a container
func (c *Calcium) GetContainer(id string) (*types.Container, error) {
return c.store.GetContainer(id)
func (c *Calcium) GetContainer(ID string) (*types.Container, error) {
return c.store.GetContainer(context.Background(), ID)
}

//GetContainers get containers
func (c *Calcium) GetContainers(ids []string) ([]*types.Container, error) {
return c.store.GetContainers(ids)
func (c *Calcium) GetContainers(IDs []string) ([]*types.Container, error) {
return c.store.GetContainers(context.Background(), IDs)
}

//ContainerDeployed show container deploy status
func (c *Calcium) ContainerDeployed(ID, appname, entrypoint, nodename, data string) error {
return c.store.ContainerDeployed(ID, appname, entrypoint, nodename, data)
return c.store.ContainerDeployed(context.Background(), ID, appname, entrypoint, nodename, data)
}

//ListContainers list containers
func (c *Calcium) ListContainers(appname, entrypoint, nodename string) ([]*types.Container, error) {
return c.store.ListContainers(appname, entrypoint, nodename)
return c.store.ListContainers(context.Background(), appname, entrypoint, nodename)
}
27 changes: 14 additions & 13 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package calcium

import (
"context"
"fmt"
"sync"

Expand All @@ -11,9 +12,9 @@ import (
)

//ReallocResource allow realloc container resource
func (c *Calcium) ReallocResource(ids []string, cpu float64, mem int64) (chan *types.ReallocResourceMessage, error) {
func (c *Calcium) ReallocResource(IDs []string, cpu float64, mem int64) (chan *types.ReallocResourceMessage, error) {
// TODO 大量容器 Get 的时候有性能问题
containers, err := c.store.GetContainers(ids)
containers, err := c.store.GetContainers(context.Background(), IDs)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -44,7 +45,7 @@ func (c *Calcium) ReallocResource(ids []string, cpu float64, mem int64) (chan *t
continue
}

pod, err := c.store.GetPod(container.Podname)
pod, err := c.store.GetPod(context.Background(), container.Podname)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -113,12 +114,12 @@ func (c *Calcium) checkNodesMemory(podname string, nodeContainers NodeContainers
if err != nil {
return err
}
defer lock.Unlock()
defer lock.Unlock(context.Background())
for node, containers := range nodeContainers {
if cap := int(node.MemCap / memory); cap < len(containers) {
return fmt.Errorf("Not enough resource %s", node.Name)
}
if err := c.store.UpdateNodeMem(podname, node.Name, int64(len(containers))*memory, "-"); err != nil {
if err := c.store.UpdateNodeMem(context.Background(), podname, node.Name, int64(len(containers))*memory, "-"); err != nil {
return err
}
}
Expand Down Expand Up @@ -183,21 +184,21 @@ func (c *Calcium) doUpdateContainerWithMemoryPrior(
ch <- &types.ReallocResourceMessage{ContainerID: containerJSON.ID, Success: false}
// 如果是增加内存,失败的时候应该把内存还回去
if memory > 0 && !container.RawResource {
if err := c.store.UpdateNodeMem(podname, node.Name, memory, "+"); err != nil {
if err := c.store.UpdateNodeMem(context.Background(), podname, node.Name, memory, "+"); err != nil {
log.Errorf("[doUpdateContainerWithMemoryPrior] failed to set mem back %s", containerJSON.ID)
}
}
continue
}
// 如果是要降低内存,当执行成功的时候需要把内存还回去
if memory < 0 && !container.RawResource {
if err := c.store.UpdateNodeMem(podname, node.Name, -memory, "+"); err != nil {
if err := c.store.UpdateNodeMem(context.Background(), podname, node.Name, -memory, "+"); err != nil {
log.Errorf("[doUpdateContainerWithMemoryPrior] failed to set mem back %s", containerJSON.ID)
}
}

container.Memory = newMemory
if err := c.store.AddContainer(container); err != nil {
if err := c.store.AddContainer(context.Background(), container); err != nil {
log.Errorf("[doUpdateContainerWithMemoryPrior] update container meta failed %v", err)
// 立即中断
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
Expand All @@ -216,7 +217,7 @@ func (c *Calcium) reallocNodesCPU(
if err != nil {
return nil, err
}
defer lock.Unlock()
defer lock.Unlock(context.Background())

// TODO too slow
nodesCPUMap := CPUNodeContainersMap{}
Expand All @@ -226,7 +227,7 @@ func (c *Calcium) reallocNodesCPU(
// 把 CPU 还回去,变成新的可用资源
// 即便有并发操作,不影响 Create 操作
// 最坏情况就是 CPU 重叠了,可以外部纠正
if err := c.store.UpdateNodeCPU(podname, node.Name, container.CPU, "+"); err != nil {
if err := c.store.UpdateNodeCPU(context.Background(), podname, node.Name, container.CPU, "+"); err != nil {
return nil, err
}
node.CPU.Add(container.CPU)
Expand All @@ -247,7 +248,7 @@ func (c *Calcium) reallocNodesCPU(
result, changed, err := c.scheduler.SelectCPUNodes(nodesInfo, cpu, containersNum)
if err != nil {
for _, container := range containers {
if err := c.store.UpdateNodeCPU(podname, node.Name, container.CPU, "-"); err != nil {
if err := c.store.UpdateNodeCPU(context.Background(), podname, node.Name, container.CPU, "-"); err != nil {
return nil, err
}
}
Expand All @@ -258,7 +259,7 @@ func (c *Calcium) reallocNodesCPU(
containersCPUMap, hasResult := result[node.Name]
if isChanged && hasResult {
node.CPU = nodeCPUMap
if err := c.store.UpdateNode(node); err != nil {
if err := c.store.UpdateNode(context.Background(), node); err != nil {
return nil, err
}
if _, ok := nodesCPUMap[cpu]; !ok {
Expand Down Expand Up @@ -318,7 +319,7 @@ func (c *Calcium) doReallocContainersWithCPUPrior(
}

container.CPU = quota
if err := c.store.AddContainer(container); err != nil {
if err := c.store.AddContainer(context.Background(), container); err != nil {
log.Errorf("[doReallocContainersWithCPUPrior] update container meta failed %v", err)
// 立即中断
ch <- &types.ReallocResourceMessage{ContainerID: container.ID, Success: false}
Expand Down
8 changes: 4 additions & 4 deletions cluster/calcium/remove_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ func (c *Calcium) removeOneContainer(container *types.Container) error {
// but if it's 0, just ignore to save 1 time write on etcd.
if container.CPU.Total() > 0 {
log.Debugf("[removeOneContainer] Restore node %s cpu: %v", container.Nodename, container.CPU)
if err := c.store.UpdateNodeCPU(container.Podname, container.Nodename, container.CPU, "+"); err != nil {
if err := c.store.UpdateNodeCPU(context.Background(), container.Podname, container.Nodename, container.CPU, "+"); err != nil {
log.Errorf("[removeOneContainer] Update Node CPU failed %v", err)
}
return
}
if container.Memory > 0 {
log.Debugf("[removeOneContainer] Restore node %s memory: %d", container.Nodename, container.Memory)
if err := c.store.UpdateNodeMem(container.Podname, container.Nodename, container.Memory, "+"); err != nil {
if err := c.store.UpdateNodeMem(context.Background(), container.Podname, container.Nodename, container.Memory, "+"); err != nil {
log.Errorf("[removeOneContainer] Update Node Memory failed %v", err)
}
}
Expand All @@ -134,7 +134,7 @@ func (c *Calcium) removeOneContainer(container *types.Container) error {
if err != nil {
return err
}
defer lock.Unlock()
defer lock.Unlock(context.Background())

// 这里 block 的问题很严重,按照目前的配置是 5 分钟一级的 block
// 一个简单的处理方法是相信 ctx 不相信 docker 自身的处理
Expand All @@ -156,7 +156,7 @@ func (c *Calcium) removeOneContainer(container *types.Container) error {
}
log.Debugf("[removeOneContainer] Container removed %s", container.ID)

return c.store.RemoveContainer(container)
return c.store.RemoveContainer(context.Background(), container)
}

// 同步地删除容器, 在某些需要等待的场合异常有用!
Expand Down
13 changes: 7 additions & 6 deletions cluster/calcium/resource.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package calcium

import (
"context"
"fmt"
"sort"
"sync"
Expand All @@ -17,7 +18,7 @@ func (c *Calcium) allocMemoryPodResource(opts *types.DeployOptions) ([]types.Nod
if err != nil {
return nil, err
}
defer lock.Unlock()
defer lock.Unlock(context.Background())

cpuandmem, _, err := c.getCPUAndMem(opts.Podname, opts.Nodename, opts.NodeLabels)
if err != nil {
Expand All @@ -27,7 +28,7 @@ func (c *Calcium) allocMemoryPodResource(opts *types.DeployOptions) ([]types.Nod
nodesInfo := getNodesInfo(cpuandmem)

// Load deploy status
nodesInfo, err = c.store.MakeDeployStatus(opts, nodesInfo)
nodesInfo, err = c.store.MakeDeployStatus(context.Background(), opts, nodesInfo)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -58,7 +59,7 @@ func (c *Calcium) allocMemoryPodResource(opts *types.DeployOptions) ([]types.Nod
go func(nodeInfo types.NodeInfo) {
defer wg.Done()
memoryTotal := opts.Memory * int64(nodeInfo.Deploy)
c.store.UpdateNodeMem(opts.Podname, nodeInfo.Name, memoryTotal, "-")
c.store.UpdateNodeMem(context.Background(), opts.Podname, nodeInfo.Name, memoryTotal, "-")
}(nodeInfo)
}
wg.Wait()
Expand All @@ -70,7 +71,7 @@ func (c *Calcium) allocCPUPodResource(opts *types.DeployOptions) (map[string][]t
if err != nil {
return nil, err
}
defer lock.Unlock()
defer lock.Unlock(context.Background())

cpuandmem, nodes, err := c.getCPUAndMem(opts.Podname, opts.Nodename, opts.NodeLabels)
if err != nil {
Expand All @@ -79,7 +80,7 @@ func (c *Calcium) allocCPUPodResource(opts *types.DeployOptions) (map[string][]t
nodesInfo := getNodesInfo(cpuandmem)

// Load deploy status
nodesInfo, err = c.store.MakeDeployStatus(opts, nodesInfo)
nodesInfo, err = c.store.MakeDeployStatus(context.Background(), opts, nodesInfo)
if err != nil {
return nil, err
}
Expand All @@ -102,7 +103,7 @@ func (c *Calcium) allocCPUPodResource(opts *types.DeployOptions) (map[string][]t
if ok {
node.CPU = r
// ignore error
c.store.UpdateNode(node)
c.store.UpdateNode(context.Background(), node)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/run_and_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, std

// 找不到对应node也不管
// 理论上不会这样
node, err := c.store.GetNode(message.Podname, message.Nodename)
node, err := c.store.GetNode(context.Background(), message.Podname, message.Nodename)
if err != nil {
log.Errorf("[RunAndWait] Can't find node, %v", err)
continue
Expand Down
Loading

0 comments on commit 455e478

Please sign in to comment.