diff --git a/VERSION b/VERSION index dcdb62a29..d96db7c42 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.7.27d +0.7.28 diff --git a/cluster/calcium/run_and_wait.go b/cluster/calcium/run_and_wait.go index 0d5495c8a..74de0a9cd 100644 --- a/cluster/calcium/run_and_wait.go +++ b/cluster/calcium/run_and_wait.go @@ -3,10 +3,12 @@ package calcium import ( "bufio" "fmt" + "io" "sync" log "github.com/Sirupsen/logrus" enginetypes "github.com/docker/docker/api/types" + "github.com/docker/docker/pkg/stdcopy" "gitlab.ricebook.net/platform/core/types" "golang.org/x/net/context" ) @@ -49,39 +51,45 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan defer wg.Done() resp, err := node.Engine.ContainerLogs(context.Background(), message.ContainerID, logsOpts) if err != nil { - data := fmt.Sprintf("[RunAndWait] Failed to get logs, %s", err.Error()) - ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(data)} + log.Errorf("[RunAndWait] Failed to get logs, %s", err.Error()) return } - scanner := bufio.NewScanner(resp) + outr, outw := io.Pipe() + errr, errw := io.Pipe() + stream := io.MultiReader(outr, errr) + go func() { + defer resp.Close() + defer outw.Close() + defer errw.Close() + stdcopy.StdCopy(outw, errw, resp) + }() + + scanner := bufio.NewScanner(stream) for scanner.Scan() { data := scanner.Bytes() - //log.Debugf("[RunAndWait] %s %s", message.ContainerID[:12], data[utils.StreamPrefix:]) - m := &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data} - ch <- m + ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data} + log.Debugf("[RunAndWait] %s %s", message.ContainerID[:12], data) } if err := scanner.Err(); err != nil { - data := fmt.Sprintf("[RunAndWait] Parse log failed, %s", err.Error()) - ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(data)} + log.Errorf("[RunAndWait] Parse log failed, %s", err.Error()) return } container, err := c.GetContainer(message.ContainerID) if err != nil { - data := fmt.Sprintf("[RunAndWait] Container not found, %s", err.Error()) - ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(data)} + log.Errorf("[RunAndWait] Container not found, %s", err.Error()) return } containerJSON, err := container.Inspect() defer func() { go c.removeOneContainer(container, containerJSON) }() - if err == nil { - ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(fmt.Sprintf("[exitcode] %d", containerJSON.State.ExitCode))} - } else { - ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error()))} + exitData := []byte(fmt.Sprintf("[exitcode] %d", containerJSON.State.ExitCode)) + if err != nil { + exitData = []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error())) } + ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: exitData} log.Infof("[RunAndWait] Container %s finished, remove", message.ContainerID) }(node, message) } diff --git a/utils/utils.go b/utils/utils.go index 814311c23..dbeac1d61 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -18,7 +18,6 @@ const ( letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" shortenLength = 7 CpuPeriodBase = 100000 - StreamPrefix = 8 ) func RandomString(n int) string {