Skip to content

Commit

Permalink
Merge branch 'master' into 'master'
Browse files Browse the repository at this point in the history
refactor

See merge request !82
  • Loading branch information
CMGS committed Apr 17, 2017
2 parents cb01060 + 9b9814d commit 021a4f0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 25 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.29
0.7.29a
44 changes: 20 additions & 24 deletions cluster/calcium/run_and_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package calcium
import (
"bufio"
"fmt"
"sync"

log "github.com/Sirupsen/logrus"
enginetypes "github.com/docker/docker/api/types"
Expand All @@ -27,14 +26,12 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
}

go func() {
wg := &sync.WaitGroup{}
defer log.Info("[RunAndWait] Finish run and wait for containers")
defer close(ch)
defer wg.Wait()
logsOpts := enginetypes.ContainerLogsOptions{Follow: true, ShowStdout: true, ShowStderr: true}

ids := map[string]*types.Node{}
for message := range createChan {
wg.Add(1)
if message.ContainerID == "" {
log.Errorf("[RunAndWait] Can't find container id %s", err.Error())
continue
Expand All @@ -46,9 +43,9 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
continue
}

go func(node *types.Node, message *types.CreateContainerMessage) {
defer wg.Done()
resp, err := node.Engine.ContainerLogs(context.Background(), message.ContainerID, logsOpts)
ids[message.ContainerID] = node
go func(node *types.Node, containerID string) {
resp, err := node.Engine.ContainerLogs(context.Background(), containerID, logsOpts)
if err != nil {
log.Errorf("[RunAndWait] Failed to get logs, %s", err.Error())
return
Expand All @@ -58,31 +55,30 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
scanner := bufio.NewScanner(stream)
for scanner.Scan() {
data := scanner.Bytes()
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data}
log.Debugf("[RunAndWait] %s %s", message.ContainerID[:12], data)
ch <- &types.RunAndWaitMessage{ContainerID: containerID, Data: data}
log.Debugf("[RunAndWait] %s %s", containerID[:12], data)
}

if err := scanner.Err(); err != nil {
log.Errorf("[RunAndWait] Parse log failed, %s", err.Error())
return
}
}(node, message.ContainerID)
}

container, err := c.GetContainer(message.ContainerID)
if err != nil {
log.Errorf("[RunAndWait] Container not found, %s", err.Error())
return
}

containerJSON, err := container.Inspect()
defer func() { go c.removeOneContainer(container, containerJSON) }()
exitData := []byte(fmt.Sprintf("[exitcode] %d", containerJSON.State.ExitCode))
if err != nil {
exitData = []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error()))
}
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: exitData}
log.Infof("[RunAndWait] Container %s finished, remove", message.ContainerID)
}(node, message)
rmids := []string{}
for id, node := range ids {
rmids = append(rmids, id)
code, err := node.Engine.ContainerWait(context.Background(), id)
exitData := []byte(fmt.Sprintf("[exitcode] %d", code))
if err != nil {
log.Errorf("%s run failed, %s", id[:12], err.Error())
exitData = []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error()))
}
ch <- &types.RunAndWaitMessage{ContainerID: id, Data: exitData}
log.Infof("[RunAndWait] Container %s finished, remove", id[:12])
}
go c.RemoveContainer(rmids)
}()

return ch, nil
Expand Down

0 comments on commit 021a4f0

Please sign in to comment.