Skip to content

Commit

Permalink
rewrite lock process
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Nov 23, 2018
1 parent ffaf8a5 commit ed8e65d
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 248 deletions.
24 changes: 0 additions & 24 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package calcium

import (
"context"
"fmt"
"sync"

enginetypes "github.com/docker/docker/api/types"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/lock"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -134,25 +132,3 @@ func (c *Calcium) doRemoveContainer(ctx context.Context, container *types.Contai

return c.store.RemoveContainer(ctx, container)
}

func (c *Calcium) doLockContainer(ctx context.Context, container *types.Container) (*types.Container, enginetypes.ContainerJSON, lock.DistributedLock, error) {
lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, container.ID), int(c.config.GlobalTimeout.Seconds()))
if err != nil {
return container, enginetypes.ContainerJSON{}, nil, err
}
log.Debugf("[doLockContainer] Container %s locked", container.ID)
// 确保是有这个容器的
containerJSON, err := container.Inspect(ctx)
if err != nil {
lock.Unlock(ctx)
return container, enginetypes.ContainerJSON{}, nil, err
}
// 更新容器元信息
// 可能在等锁的过程中被 realloc 了
rContainer, err := c.store.GetContainer(ctx, container.ID)
if err != nil {
lock.Unlock(ctx)
return container, enginetypes.ContainerJSON{}, nil, err
}
return rContainer, containerJSON, lock, nil
}
13 changes: 0 additions & 13 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,12 @@ import (
engineapi "github.com/docker/docker/client"
"github.com/docker/docker/registry"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/lock"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)

// Lock is lock for calcium
func (c *Calcium) Lock(ctx context.Context, name string, timeout int) (lock.DistributedLock, error) {
lock, err := c.store.CreateLock(name, timeout)
if err != nil {
return nil, err
}
if err = lock.Lock(ctx); err != nil {
return nil, err
}
return lock, nil
}

func makeResourceSetting(cpu float64, memory int64, cpuMap types.CPUMap, softlimit bool) enginecontainer.Resources {
resource := enginecontainer.Resources{}
if cpu > 0 {
Expand Down
89 changes: 89 additions & 0 deletions cluster/calcium/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package calcium

import (
"context"
"fmt"

enginetypes "github.com/docker/docker/api/types"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/lock"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)

// Lock is lock for calcium
func (c *Calcium) Lock(ctx context.Context, name string, timeout int) (lock.DistributedLock, error) {
lock, err := c.store.CreateLock(name, timeout)
if err != nil {
return nil, err
}
if err = lock.Lock(ctx); err != nil {
return nil, err
}
return lock, nil
}

// UnlockAll unlock all locks
func (c *Calcium) UnlockAll(ctx context.Context, locks map[string]lock.DistributedLock) {
for _, lock := range locks {
if err := lock.Unlock(ctx); err != nil {
log.Errorf("[UnlockAll] Unlock failed %v", err)
}
}
}

// LockAndGetContainers lock and get containers
func (c *Calcium) LockAndGetContainers(ctx context.Context, IDs []string) (map[string]*types.Container, map[string]enginetypes.ContainerJSON, map[string]lock.DistributedLock, error) {
containers := map[string]*types.Container{}
containerJSONs := map[string]enginetypes.ContainerJSON{}
locks := map[string]lock.DistributedLock{}
for _, ID := range IDs {
container, containerJSON, lock, err := c.LockAndGetContainer(ctx, ID)
if err != nil {
c.UnlockAll(ctx, locks)
return nil, nil, nil, err
}
containers[ID] = container
containerJSONs[ID] = containerJSON
locks[ID] = lock
}
return containers, containerJSONs, locks, nil
}

// LockAndGetContainer lock and get container
func (c *Calcium) LockAndGetContainer(ctx context.Context, ID string) (*types.Container, enginetypes.ContainerJSON, lock.DistributedLock, error) {
lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, ID), c.config.LockTimeout)
if err != nil {
return nil, enginetypes.ContainerJSON{}, nil, err
}
log.Debugf("[LockAndGetContainer] Container %s locked", ID)
// Get container
container, err := c.store.GetContainer(ctx, ID)
if err != nil {
lock.Unlock(ctx)
return nil, enginetypes.ContainerJSON{}, nil, err
}
// 确保是有这个容器的
containerJSON, err := container.Inspect(ctx)
if err != nil {
lock.Unlock(ctx)
return nil, enginetypes.ContainerJSON{}, nil, err
}
return container, containerJSON, lock, nil
}

// LockAndGetNode lock and get node
func (c *Calcium) LockAndGetNode(ctx context.Context, podname, nodename string) (*types.Node, lock.DistributedLock, error) {
lock, err := c.Lock(ctx, fmt.Sprintf(cluster.NodeLock, podname, nodename), c.config.LockTimeout)
if err != nil {
return nil, nil, err
}
log.Debugf("[LockAndGetNode] Node %s locked", nodename)
// Get node
node, err := c.GetNode(ctx, podname, nodename)
if err != nil {
lock.Unlock(ctx)
return nil, nil, err
}
return node, lock, nil
}
Loading

0 comments on commit ed8e65d

Please sign in to comment.