diff --git a/cluster/calcium/control.go b/cluster/calcium/control.go index 24efe7aeb..224b04c2f 100644 --- a/cluster/calcium/control.go +++ b/cluster/calcium/control.go @@ -2,10 +2,12 @@ package calcium import ( "context" + "fmt" "sync" enginetypes "github.com/docker/docker/api/types" "github.com/projecteru2/core/cluster" + "github.com/projecteru2/core/lock" "github.com/projecteru2/core/types" log "github.com/sirupsen/logrus" ) @@ -132,3 +134,22 @@ func (c *Calcium) doRemoveContainer(ctx context.Context, container *types.Contai return c.store.RemoveContainer(ctx, container) } + +func (c *Calcium) doLockContainer(ctx context.Context, container *types.Container) (*types.Container, enginetypes.ContainerJSON, lock.DistributedLock, error) { + lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, container.ID), int(c.config.GlobalTimeout.Seconds())) + if err != nil { + return nil, enginetypes.ContainerJSON{}, nil, err + } + // 确保是有这个容器的 + containerJSON, err := container.Inspect(ctx) + if err != nil { + return nil, enginetypes.ContainerJSON{}, nil, err + } + // 更新容器元信息 + // 可能在等锁的过程中被 realloc 了 + container, err = c.store.GetContainer(ctx, container.ID) + if err != nil { + return nil, enginetypes.ContainerJSON{}, nil, err + } + return container, containerJSON, lock, err +} diff --git a/cluster/calcium/remove.go b/cluster/calcium/remove.go index fe237188e..dda220fbf 100644 --- a/cluster/calcium/remove.go +++ b/cluster/calcium/remove.go @@ -2,10 +2,8 @@ package calcium import ( "context" - "fmt" "sync" - "github.com/projecteru2/core/cluster" "github.com/projecteru2/core/types" log "github.com/sirupsen/logrus" ) @@ -36,7 +34,7 @@ func (c *Calcium) RemoveContainer(ctx context.Context, IDs []string, force bool) go func(container *types.Container) { defer wg.Done() - message, err := c.doStopAndRemoveContainer(ctx, container, ib, force) + container, message, err := c.doStopAndRemoveContainer(ctx, container, ib, force) success := false defer func() { @@ -63,35 +61,29 @@ func (c *Calcium) RemoveContainer(ctx context.Context, IDs []string, force bool) wg.Wait() // 把收集的image清理掉 - //TODO 如果 remove 是异步的,这里就不能用 ctx 了,gRPC 一断这里就会死 + // TODO 如果 remove 是异步的,这里就不能用 ctx 了,gRPC 一断这里就会死 go c.cleanCachedImage(ctx, ib) }() return ch, nil } -func (c *Calcium) doStopAndRemoveContainer(ctx context.Context, container *types.Container, ib *imageBucket, force bool) (string, error) { - lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, container.ID), int(c.config.GlobalTimeout.Seconds())) +func (c *Calcium) doStopAndRemoveContainer(ctx context.Context, container *types.Container, ib *imageBucket, force bool) (*types.Container, string, error) { + container, containerJSON, lock, err := c.doLockContainer(ctx, container) if err != nil { - return err.Error(), err + return nil, err.Error(), err } defer lock.Unlock(ctx) - // 确保是有这个容器的 - containerJSON, err := container.Inspect(ctx) - if err != nil { - return err.Error(), err - } - var message string message, err = c.doStopContainer(ctx, container, containerJSON, ib, force) if err != nil { - return message, err + return nil, message, err } if err = c.doRemoveContainer(ctx, container); err != nil { message += err.Error() } - return message, err + return container, message, err } func (c *Calcium) cleanCachedImage(ctx context.Context, ib *imageBucket) { diff --git a/cluster/calcium/replace.go b/cluster/calcium/replace.go index e6fd4d88b..8a82fdd2c 100644 --- a/cluster/calcium/replace.go +++ b/cluster/calcium/replace.go @@ -2,10 +2,8 @@ package calcium import ( "context" - "fmt" "sync" - "github.com/projecteru2/core/cluster" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" log "github.com/sirupsen/logrus" @@ -92,24 +90,15 @@ func (c *Calcium) doReplaceContainer( Success: false, Message: "", } - - // 锁住,防止删除 - lock, err := c.Lock(ctx, fmt.Sprintf(cluster.ContainerLock, container.ID), int(c.config.GlobalTimeout.Seconds())) + container, containerJSON, lock, err := c.doLockContainer(ctx, container) if err != nil { return nil, removeMessage, err } defer lock.Unlock(ctx) - // 确保是有这个容器的 - containerJSON, err := container.Inspect(ctx) - if err != nil { - return nil, removeMessage, err - } - if !utils.FilterContainer(containerJSON.Config.Labels, opts.FilterLabels) { return nil, removeMessage, types.ErrNotFitLabels } - // 拉镜像 auth, err := makeEncodedAuthConfigFromRemote(c.config.Docker.AuthConfigs, opts.Image) if err != nil { @@ -119,13 +108,11 @@ func (c *Calcium) doReplaceContainer( if err = pullImage(ctx, container.Node, opts.Image, auth); err != nil { return nil, removeMessage, err } - // 停止容器 removeMessage.Message, err = c.doStopContainer(ctx, container, containerJSON, ib, opts.Force) if err != nil { return nil, removeMessage, err } - // 获得文件 io for src, dst := range opts.Copy { stream, _, err := container.Engine.CopyFromContainer(ctx, container.ID, src) @@ -138,7 +125,6 @@ func (c *Calcium) doReplaceContainer( } opts.DeployOptions.Data[dst] = fname } - // 不涉及资源消耗,创建容器失败会被回收容器而不回收资源 // 创建成功容器会干掉之前的老容器也不会动资源,实际上实现了动态捆绑 createMessage := c.createAndStartContainer(ctx, index, container.Node, &opts.DeployOptions, container.CPU) @@ -152,13 +138,11 @@ func (c *Calcium) doReplaceContainer( } return nil, removeMessage, createMessage.Error } - // 干掉老的 if err = c.doRemoveContainer(ctx, container); err != nil { log.Errorf("[replaceAndRemove] Old container %s remove failed %v", container.ID, err) return createMessage, removeMessage, err } - removeMessage.Success = true return createMessage, removeMessage, nil }