diff --git a/VERSION b/VERSION index cb80b803c..ec9f4fa6f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.7.29 +0.7.29a diff --git a/cluster/calcium/run_and_wait.go b/cluster/calcium/run_and_wait.go index a1baa6e76..4f5fb5481 100644 --- a/cluster/calcium/run_and_wait.go +++ b/cluster/calcium/run_and_wait.go @@ -3,7 +3,6 @@ package calcium import ( "bufio" "fmt" - "sync" log "github.com/Sirupsen/logrus" enginetypes "github.com/docker/docker/api/types" @@ -27,14 +26,12 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan } go func() { - wg := &sync.WaitGroup{} defer log.Info("[RunAndWait] Finish run and wait for containers") defer close(ch) - defer wg.Wait() logsOpts := enginetypes.ContainerLogsOptions{Follow: true, ShowStdout: true, ShowStderr: true} + ids := map[string]*types.Node{} for message := range createChan { - wg.Add(1) if message.ContainerID == "" { log.Errorf("[RunAndWait] Can't find container id %s", err.Error()) continue @@ -46,9 +43,9 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan continue } - go func(node *types.Node, message *types.CreateContainerMessage) { - defer wg.Done() - resp, err := node.Engine.ContainerLogs(context.Background(), message.ContainerID, logsOpts) + ids[message.ContainerID] = node + go func(node *types.Node, containerID string) { + resp, err := node.Engine.ContainerLogs(context.Background(), containerID, logsOpts) if err != nil { log.Errorf("[RunAndWait] Failed to get logs, %s", err.Error()) return @@ -58,31 +55,30 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan scanner := bufio.NewScanner(stream) for scanner.Scan() { data := scanner.Bytes() - ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data} - log.Debugf("[RunAndWait] %s %s", message.ContainerID[:12], data) + ch <- &types.RunAndWaitMessage{ContainerID: containerID, Data: data} + log.Debugf("[RunAndWait] %s %s", containerID[:12], data) } if err := scanner.Err(); err != nil { log.Errorf("[RunAndWait] Parse log failed, %s", err.Error()) return } + }(node, message.ContainerID) + } - container, err := c.GetContainer(message.ContainerID) - if err != nil { - log.Errorf("[RunAndWait] Container not found, %s", err.Error()) - return - } - - containerJSON, err := container.Inspect() - defer func() { go c.removeOneContainer(container, containerJSON) }() - exitData := []byte(fmt.Sprintf("[exitcode] %d", containerJSON.State.ExitCode)) - if err != nil { - exitData = []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error())) - } - ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: exitData} - log.Infof("[RunAndWait] Container %s finished, remove", message.ContainerID) - }(node, message) + rmids := []string{} + for id, node := range ids { + rmids = append(rmids, id) + code, err := node.Engine.ContainerWait(context.Background(), id) + exitData := []byte(fmt.Sprintf("[exitcode] %d", code)) + if err != nil { + log.Errorf("%s run failed, %s", id[:12], err.Error()) + exitData = []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error())) + } + ch <- &types.RunAndWaitMessage{ContainerID: id, Data: exitData} + log.Infof("[RunAndWait] Container %s finished, remove", id[:12]) } + go c.RemoveContainer(rmids) }() return ch, nil