Skip to content

Commit

Permalink
Merge branch 'stdcopy' into 'master'
Browse files Browse the repository at this point in the history
use stdcopy

See merge request !80
  • Loading branch information
tonic committed Apr 12, 2017
2 parents e5356e4 + e4105bd commit 3b86066
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.27d
0.7.28
36 changes: 22 additions & 14 deletions cluster/calcium/run_and_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package calcium
import (
"bufio"
"fmt"
"io"
"sync"

log "github.com/Sirupsen/logrus"
enginetypes "github.com/docker/docker/api/types"
"github.com/docker/docker/pkg/stdcopy"
"gitlab.ricebook.net/platform/core/types"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -49,39 +51,45 @@ func (c *calcium) RunAndWait(specs types.Specs, opts *types.DeployOptions) (chan
defer wg.Done()
resp, err := node.Engine.ContainerLogs(context.Background(), message.ContainerID, logsOpts)
if err != nil {
data := fmt.Sprintf("[RunAndWait] Failed to get logs, %s", err.Error())
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(data)}
log.Errorf("[RunAndWait] Failed to get logs, %s", err.Error())
return
}

scanner := bufio.NewScanner(resp)
outr, outw := io.Pipe()
errr, errw := io.Pipe()
stream := io.MultiReader(outr, errr)
go func() {
defer resp.Close()
defer outw.Close()
defer errw.Close()
stdcopy.StdCopy(outw, errw, resp)
}()

scanner := bufio.NewScanner(stream)
for scanner.Scan() {
data := scanner.Bytes()
//log.Debugf("[RunAndWait] %s %s", message.ContainerID[:12], data[utils.StreamPrefix:])
m := &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data}
ch <- m
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: data}
log.Debugf("[RunAndWait] %s %s", message.ContainerID[:12], data)
}

if err := scanner.Err(); err != nil {
data := fmt.Sprintf("[RunAndWait] Parse log failed, %s", err.Error())
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(data)}
log.Errorf("[RunAndWait] Parse log failed, %s", err.Error())
return
}

container, err := c.GetContainer(message.ContainerID)
if err != nil {
data := fmt.Sprintf("[RunAndWait] Container not found, %s", err.Error())
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(data)}
log.Errorf("[RunAndWait] Container not found, %s", err.Error())
return
}

containerJSON, err := container.Inspect()
defer func() { go c.removeOneContainer(container, containerJSON) }()
if err == nil {
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(fmt.Sprintf("[exitcode] %d", containerJSON.State.ExitCode))}
} else {
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error()))}
exitData := []byte(fmt.Sprintf("[exitcode] %d", containerJSON.State.ExitCode))
if err != nil {
exitData = []byte(fmt.Sprintf("[exitcode]unknown %s", err.Error()))
}
ch <- &types.RunAndWaitMessage{ContainerID: message.ContainerID, Data: exitData}
log.Infof("[RunAndWait] Container %s finished, remove", message.ContainerID)
}(node, message)
}
Expand Down
1 change: 0 additions & 1 deletion utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const (
letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
shortenLength = 7
CpuPeriodBase = 100000
StreamPrefix = 8
)

func RandomString(n int) string {
Expand Down

0 comments on commit 3b86066

Please sign in to comment.