Skip to content

Commit

Permalink
refactor unlock process
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Nov 23, 2018
1 parent ef10455 commit 0d6b592
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 27 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *Calcium) doCreateContainer(ctx context.Context, opts *types.DeployOptio
if err := c.store.UpdateNodeResource(ctx, node, m.CPU, opts.Memory, store.ActionIncr); err != nil {
log.Errorf("[doCreateContainer] Reset node %s failed %v", nodeInfo.Name, err)
}
nodeLock.Unlock(context.Background())
c.doUnlock(nodeLock, node.Name)
} else {
log.Warnf("[doCreateContainer] Container %s not removed", m.ContainerID)
}
Expand Down
29 changes: 10 additions & 19 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,21 @@ func (c *Calcium) doLock(ctx context.Context, name string, timeout int) (lock.Di
return lock, nil
}

func (c *Calcium) doUnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) {
for _, lock := range locks {
func (c *Calcium) doUnlock(lock lock.DistributedLock, msg string) error {
log.Debugf("[doUnlockNode] Unlock %s", msg)
return lock.Unlock(context.Background())
}

func (c *Calcium) doUnlockAll(locks map[string]lock.DistributedLock) {
for n, lock := range locks {
// force unlock
if err := lock.Unlock(context.Background()); err != nil {
if err := c.doUnlock(lock, n); err != nil {
log.Errorf("[doUnlockAll] Unlock failed %v", err)
continue
}
}
}

func (c *Calcium) doUnlockAllContainers(locks map[string]lock.DistributedLock) {
for ID := range locks {
log.Debugf("[doUnlockAllContainers] Unlock container %s", ID)
}
c.doUnlockAll(context.Background(), locks)
}

func (c *Calcium) doUnlockAllNodes(locks map[string]lock.DistributedLock) {
for nodename := range locks {
log.Debugf("[doUnlockAllNodes] Unlock node %s", nodename)
}
c.doUnlockAll(context.Background(), locks)
}

func (c *Calcium) doLockAndGetContainer(ctx context.Context, ID string) (*types.Container, enginetypes.ContainerJSON, lock.DistributedLock, error) {
lock, err := c.doLock(ctx, fmt.Sprintf(cluster.ContainerLock, ID), c.config.LockTimeout)
if err != nil {
Expand Down Expand Up @@ -74,7 +65,7 @@ func (c *Calcium) doLockAndGetContainers(ctx context.Context, IDs []string) (map
for _, ID := range IDs {
container, containerJSON, lock, err := c.doLockAndGetContainer(ctx, ID)
if err != nil {
c.doUnlockAllContainers(locks)
c.doUnlockAll(locks)
return nil, nil, nil, err
}
containers[ID] = container
Expand Down Expand Up @@ -131,7 +122,7 @@ func (c *Calcium) doLockAndGetNodes(ctx context.Context, podname, nodename strin
for _, n := range ns {
node, lock, err := c.doLockAndGetNode(ctx, podname, n.Name)
if err != nil {
c.doUnlockAllNodes(locks)
c.doUnlockAll(locks)
return nil, nil, err
}
nodes[node.Name] = node
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/realloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func (c *Calcium) ReallocResource(ctx context.Context, IDs []string, cpu float64
log.Errorf("[ReallocResource] Lock and get containers failed %v", err)
return
}
defer c.doUnlockAllContainers(containerLocks)
defer c.doUnlockAll(containerLocks)
// Pod-Node-Containers
containersInfo := map[*types.Pod]nodeContainers{}
// Pod cache
podCache := map[string]*types.Pod{}
// Node locks
nodeLocks := map[string]lock.DistributedLock{}
defer c.doUnlockAllNodes(nodeLocks)
defer c.doUnlockAll(nodeLocks)
// Node cache
nodeCache := map[string]*types.Node{}

Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (c *Calcium) RemoveContainer(ctx context.Context, IDs []string, force bool)
go func(container *types.Container, containerJSON enginetypes.ContainerJSON, containerLock lock.DistributedLock) {
defer wg.Done()
// force to unlock
defer containerLock.Unlock(context.Background())
defer c.doUnlock(containerLock, container.ID)
message := ""
success := false

Expand All @@ -50,7 +50,7 @@ func (c *Calcium) RemoveContainer(ctx context.Context, IDs []string, force bool)
if err != nil {
return
}
defer nodeLock.Unlock(context.Background())
defer c.doUnlock(nodeLock, node.Name)

message, err = c.doStopAndRemoveContainer(ctx, container, containerJSON, ib, force)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (c *Calcium) ReplaceContainer(ctx context.Context, opts *types.ReplaceOptio
}
if opts.Podname != "" && container.Podname != opts.Podname {
log.Debugf("[ReplaceContainer] Skip not in pod container %s", container.ID)
containerLock.Unlock(context.Background())
c.doUnlock(containerLock, container.ID)
continue
}

Expand All @@ -51,7 +51,7 @@ func (c *Calcium) ReplaceContainer(ctx context.Context, opts *types.ReplaceOptio

go func(replaceOpts types.ReplaceOptions, container *types.Container, containerJSON enginetypes.ContainerJSON, containerLock lock.DistributedLock, index int) {
defer wg.Done()
defer containerLock.Unlock(context.Background())
defer c.doUnlock(containerLock, container.ID)
// 使用复制之后的配置
// 停老的,起新的
replaceOpts.Memory = container.Memory
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (c *Calcium) doAllocResource(ctx context.Context, opts *types.DeployOptions
if err != nil {
return nil, err
}
defer c.doUnlockAllNodes(nodeLocks)
defer c.doUnlockAll(nodeLocks)

cpuandmem := makeCPUAndMem(nodes)
nodesInfo = getNodesInfo(cpuandmem)
Expand Down

0 comments on commit 0d6b592

Please sign in to comment.