Skip to content

Commit

Permalink
refactor lock and update container process
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Nov 21, 2018
1 parent 51901f8 commit 532c5b9
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 32 deletions.
21 changes: 21 additions & 0 deletions cluster/calcium/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package calcium

import (
"context"
"fmt"
"sync"

enginetypes "github.com/docker/docker/api/types"
"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/lock"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -132,3 +134,22 @@ func (c *Calcium) doRemoveContainer(ctx context.Context, container *types.Contai

return c.store.RemoveContainer(ctx, container)
}

func (c *Calcium) doLockContainer(ctx context.Context, container *types.Container) (*types.Container, enginetypes.ContainerJSON, lock.DistributedLock, error) {
lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, container.ID), int(c.config.GlobalTimeout.Seconds()))
if err != nil {
return nil, enginetypes.ContainerJSON{}, nil, err
}
// 确保是有这个容器的
containerJSON, err := container.Inspect(ctx)
if err != nil {
return nil, enginetypes.ContainerJSON{}, nil, err
}
// 更新容器元信息
// 可能在等锁的过程中被 realloc 了
container, err = c.store.GetContainer(ctx, container.ID)
if err != nil {
return nil, enginetypes.ContainerJSON{}, nil, err
}
return container, containerJSON, lock, err
}
22 changes: 7 additions & 15 deletions cluster/calcium/remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package calcium

import (
"context"
"fmt"
"sync"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/types"
log "github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -36,7 +34,7 @@ func (c *Calcium) RemoveContainer(ctx context.Context, IDs []string, force bool)
go func(container *types.Container) {
defer wg.Done()

message, err := c.doStopAndRemoveContainer(ctx, container, ib, force)
container, message, err := c.doStopAndRemoveContainer(ctx, container, ib, force)
success := false

defer func() {
Expand All @@ -63,35 +61,29 @@ func (c *Calcium) RemoveContainer(ctx context.Context, IDs []string, force bool)
wg.Wait()

// 把收集的image清理掉
//TODO 如果 remove 是异步的,这里就不能用 ctx 了,gRPC 一断这里就会死
// TODO 如果 remove 是异步的,这里就不能用 ctx 了,gRPC 一断这里就会死
go c.cleanCachedImage(ctx, ib)
}()
return ch, nil
}

func (c *Calcium) doStopAndRemoveContainer(ctx context.Context, container *types.Container, ib *imageBucket, force bool) (string, error) {
lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, container.ID), int(c.config.GlobalTimeout.Seconds()))
func (c *Calcium) doStopAndRemoveContainer(ctx context.Context, container *types.Container, ib *imageBucket, force bool) (*types.Container, string, error) {
container, containerJSON, lock, err := c.doLockContainer(ctx, container)
if err != nil {
return err.Error(), err
return nil, err.Error(), err
}
defer lock.Unlock(ctx)

// 确保是有这个容器的
containerJSON, err := container.Inspect(ctx)
if err != nil {
return err.Error(), err
}

var message string
message, err = c.doStopContainer(ctx, container, containerJSON, ib, force)
if err != nil {
return message, err
return nil, message, err
}

if err = c.doRemoveContainer(ctx, container); err != nil {
message += err.Error()
}
return message, err
return container, message, err
}

func (c *Calcium) cleanCachedImage(ctx context.Context, ib *imageBucket) {
Expand Down
18 changes: 1 addition & 17 deletions cluster/calcium/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package calcium

import (
"context"
"fmt"
"sync"

"github.com/projecteru2/core/cluster"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -92,24 +90,15 @@ func (c *Calcium) doReplaceContainer(
Success: false,
Message: "",
}

// 锁住,防止删除
lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, container.ID), int(c.config.GlobalTimeout.Seconds()))
container, containerJSON, lock, err := c.doLockContainer(ctx, container)
if err != nil {
return nil, removeMessage, err
}
defer lock.Unlock(ctx)

// 确保是有这个容器的
containerJSON, err := container.Inspect(ctx)
if err != nil {
return nil, removeMessage, err
}

if !utils.FilterContainer(containerJSON.Config.Labels, opts.FilterLabels) {
return nil, removeMessage, types.ErrNotFitLabels
}

// 拉镜像
auth, err := makeEncodedAuthConfigFromRemote(c.config.Docker.AuthConfigs, opts.Image)
if err != nil {
Expand All @@ -119,13 +108,11 @@ func (c *Calcium) doReplaceContainer(
if err = pullImage(ctx, container.Node, opts.Image, auth); err != nil {
return nil, removeMessage, err
}

// 停止容器
removeMessage.Message, err = c.doStopContainer(ctx, container, containerJSON, ib, opts.Force)
if err != nil {
return nil, removeMessage, err
}

// 获得文件 io
for src, dst := range opts.Copy {
stream, _, err := container.Engine.CopyFromContainer(ctx, container.ID, src)
Expand All @@ -138,7 +125,6 @@ func (c *Calcium) doReplaceContainer(
}
opts.DeployOptions.Data[dst] = fname
}

// 不涉及资源消耗,创建容器失败会被回收容器而不回收资源
// 创建成功容器会干掉之前的老容器也不会动资源,实际上实现了动态捆绑
createMessage := c.createAndStartContainer(ctx, index, container.Node, &opts.DeployOptions, container.CPU)
Expand All @@ -152,13 +138,11 @@ func (c *Calcium) doReplaceContainer(
}
return nil, removeMessage, createMessage.Error
}

// 干掉老的
if err = c.doRemoveContainer(ctx, container); err != nil {
log.Errorf("[replaceAndRemove] Old container %s remove failed %v", container.ID, err)
return createMessage, removeMessage, err
}

removeMessage.Success = true
return createMessage, removeMessage, nil
}

0 comments on commit 532c5b9

Please sign in to comment.