Skip to content

Commit

Permalink
Add support for logging binary
Browse files Browse the repository at this point in the history
Signed-off-by: Maksim An <[email protected]>
  • Loading branch information
anmaxvl committed Nov 19, 2020
1 parent d672bc1 commit 0a3af6f
Show file tree
Hide file tree
Showing 4 changed files with 376 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*.exe
.idea
.vscode
40 changes: 36 additions & 4 deletions cmd/containerd-shim-runhcs-v1/task_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -124,7 +125,16 @@ func newHcsTask(

owner := filepath.Base(os.Args[0])

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
var io cmd.UpstreamIO

u, err := url.Parse(req.Stdout)

if err != nil || u.Scheme != "binary" {
io, err = cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
} else {
io, err = cmd.NewBinaryIO(ctx, req.ID, u)
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +187,8 @@ func newHcsTask(
req.Bundle,
ht.isWCOW,
s.Process,
io)
io,
)

if parent != nil {
// We have a parent UVM. Listen for its exit and forcibly close this
Expand Down Expand Up @@ -284,11 +295,32 @@ func (ht *hcsTask) CreateExec(ctx context.Context, req *task.ExecProcessRequest,
return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '' in task: '%s' must be running to create additional execs", ht.id)
}

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
var io cmd.UpstreamIO

u, err := url.Parse(req.Stdout)

if err != nil || u.Scheme != "binary" {
io, err = cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
} else {
io, err = cmd.NewBinaryIO(ctx, req.ID, u)
}

if err != nil {
return err
}
he := newHcsExec(ctx, ht.events, ht.id, ht.host, ht.c, req.ExecID, ht.init.Status().Bundle, ht.isWCOW, spec, io)
he := newHcsExec(
ctx,
ht.events,
ht.id,
ht.host,
ht.c,
req.ExecID,
ht.init.Status().Bundle,
ht.isWCOW,
spec,
io,
)

ht.execs.Store(req.ExecID, he)

// Publish the created event
Expand Down
249 changes: 249 additions & 0 deletions internal/cmd/io_binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
package cmd

import (
"context"
"fmt"
"io"
"net"
"net/url"
"os/exec"
"sync"
"time"

"github.com/Microsoft/go-winio"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/containerd/containerd/namespaces"
"github.com/pkg/errors"
)

const (
pipeRootBinary = `\\.\pipe`
binaryCmdWaitTimeout = 5 * time.Second
)

func newBinaryCmd(ctx context.Context, uri *url.URL, id string, ns string) *exec.Cmd {
var args []string
for k, vs := range uri.Query() {
args = append(args, k)
if len(vs) > 0 {
args = append(args, vs[0])
}
}

execPath := uri.Path

cmd := exec.CommandContext(ctx, execPath, args...)
cmd.Env = append(cmd.Env,
"CONTAINER_ID="+id,
"CONTAINER_NAMESPACE="+ns,
)

return cmd
}

type binaryIO struct {
cmd *exec.Cmd

binaryCloser sync.Once

stdin, stdout, stderr string

sout, serr, wait io.ReadWriteCloser
soutCloser sync.Once
}

// NewBinaryIO starts a binary logger
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ UpstreamIO, err error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}

var (
sout, serr, w io.ReadWriteCloser
)

bio := &binaryIO{}

stdoutPipe := fmt.Sprintf(`%s\binary-%s-stdout`, pipeRootBinary, id)
sout, err = openNPipe(stdoutPipe)
if err != nil {
return nil, err
}
bio.stdout = stdoutPipe
bio.sout = sout

stderrPipe := fmt.Sprintf(`%s\binary-%s-stderr`, pipeRootBinary, id)
serr, err = openNPipe(stderrPipe)
if err != nil {
return nil, err
}
bio.stderr = stderrPipe
bio.serr = serr

waitPipe := fmt.Sprintf(`%s\binary-%s-wait`, pipeRootBinary, id)
w, err = openNPipe(waitPipe)
if err != nil {
return nil, err
}
bio.wait = w

cmd := newBinaryCmd(ctx, uri, id, ns)
cmd.Env = append(cmd.Env,
"CONTAINER_STDOUT="+stdoutPipe,
"CONTAINER_STDERR="+stderrPipe,
"CONTAINER_WAIT="+waitPipe,
)

bio.cmd = cmd

started := make(chan bool, 1)
if err := cmd.Start(); err != nil {
return nil, err
}

go func() {
defer w.Close()

b := make([]byte, 1)
for i := 0; i < 10; i++ {
v, err := w.Read(b)
if v != 0 && err == nil {
started <- true
return
}
}

started <- false
}()

if !<-started {
return nil, errors.Errorf("failed to start binary logger")
}

return bio, nil
}

var _ UpstreamIO = &binaryIO{}

func (b *binaryIO) Close(ctx context.Context) {
b.soutCloser.Do(func() {
if b.sout != nil {
err := b.sout.Close()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while closing stdout npipe")
}
}
if b.serr != nil {
err := b.serr.Close()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while closing stderr npipe")
}
}
})
b.binaryCloser.Do(func() {
done := make(chan error, 1)
go func() {
done <- b.cmd.Wait()
}()

select {
case err := <-done:
if err != nil {
log.G(ctx).WithError(err).Errorf("error while waiting for binary cmd to finish")
}
case <-time.After(binaryCmdWaitTimeout):
log.G(ctx).Errorf("timeout while waiting for binaryIO process to finish. Killing")
err := b.cmd.Process.Kill()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while killing binaryIO process")
}
}
})
}

func (b *binaryIO) CloseStdin(ctx context.Context) {

}

func (b *binaryIO) Stdin() io.Reader {
return nil
}

func (b *binaryIO) StdinPath() string {
return ""
}

func (b *binaryIO) Stdout() io.Writer {
return b.sout
}

func (b *binaryIO) StdoutPath() string {
return b.stdout
}

func (b *binaryIO) Stderr() io.Writer {
return b.serr
}

func (b *binaryIO) StderrPath() string {
return b.stderr
}

func (b *binaryIO) Terminal() bool {
return false
}

type pipe struct {
l net.Listener
con net.Conn
conErr error
conWg sync.WaitGroup
}

func openNPipe(path string) (io.ReadWriteCloser, error) {
l, err := winio.ListenPipe(path, nil)
if err != nil {
return nil, err
}

p := &pipe{l: l}
p.conWg.Add(1)

go func() {
defer p.conWg.Done()
c, err := l.Accept()
if err != nil {
p.conErr = err
return
}
p.con = c
}()
return p, nil
}

func (p *pipe) Write(b []byte) (int, error) {
p.conWg.Wait()
if p.conErr != nil {
return 0, errors.Wrap(p.conErr, "connection error")
}
return p.con.Write(b)
}

func (p *pipe) Read(b []byte) (int, error) {
p.conWg.Wait()
if p.conErr != nil {
return 0, errors.Wrap(p.conErr, "connection error")
}
return p.con.Read(b)
}

func (p *pipe) Close() error {
p.l.Close()
p.conWg.Wait()
if p.con != nil {
err := p.con.Close()
return err
}
return p.conErr
}
Loading

0 comments on commit 0a3af6f

Please sign in to comment.