Skip to content

Commit

Permalink
Add support for logging binary
Browse files Browse the repository at this point in the history
Signed-off-by: Maksim An <[email protected]>
  • Loading branch information
anmaxvl committed Nov 22, 2020
1 parent 966beba commit 95c0d84
Show file tree
Hide file tree
Showing 4 changed files with 348 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*.exe
.idea
.vscode
40 changes: 36 additions & 4 deletions cmd/containerd-shim-runhcs-v1/task_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -124,7 +125,16 @@ func newHcsTask(

owner := filepath.Base(os.Args[0])

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
var io cmd.UpstreamIO

u, err := url.Parse(req.Stdout)

if err == nil && u.Scheme == "binary" {
io, err = cmd.NewBinaryIO(ctx, req.ID, u)
} else {
io, err = cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +187,8 @@ func newHcsTask(
req.Bundle,
ht.isWCOW,
s.Process,
io)
io,
)

if parent != nil {
// We have a parent UVM. Listen for its exit and forcibly close this
Expand Down Expand Up @@ -284,11 +295,32 @@ func (ht *hcsTask) CreateExec(ctx context.Context, req *task.ExecProcessRequest,
return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '' in task: '%s' must be running to create additional execs", ht.id)
}

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
var io cmd.UpstreamIO

u, err := url.Parse(req.Stdout)

if err == nil && u.Scheme == "binary" {
io, err = cmd.NewBinaryIO(ctx, req.ID, u)
} else {
io, err = cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
}

if err != nil {
return err
}
he := newHcsExec(ctx, ht.events, ht.id, ht.host, ht.c, req.ExecID, ht.init.Status().Bundle, ht.isWCOW, spec, io)
he := newHcsExec(
ctx,
ht.events,
ht.id,
ht.host,
ht.c,
req.ExecID,
ht.init.Status().Bundle,
ht.isWCOW,
spec,
io,
)

ht.execs.Store(req.ExecID, he)

// Publish the created event
Expand Down
238 changes: 238 additions & 0 deletions internal/cmd/io_binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
package cmd

import (
"context"
"fmt"
"io"
"net"
"net/url"
"os/exec"
"sync"
"time"

"github.com/Microsoft/go-winio"
"github.com/Microsoft/hcsshim/internal/log"
"github.com/containerd/containerd/namespaces"
"github.com/pkg/errors"
)

const (
pipeRootBinary = `\\.\pipe`
binaryCmdWaitTimeout = 5 * time.Second
)

func newBinaryCmd(ctx context.Context, uri *url.URL, id string, ns string) *exec.Cmd {
var args []string
for k, vs := range uri.Query() {
args = append(args, k)
if len(vs) > 0 && vs[0] != "" {
args = append(args, vs[0])
}
}

execPath := uri.Path
if execPath == "" {
execPath = uri.Host
}

cmd := exec.CommandContext(ctx, execPath, args...)
cmd.Env = append(cmd.Env,
"CONTAINER_ID="+id,
"CONTAINER_NAMESPACE="+ns,
)

return cmd
}

type binaryIO struct {
cmd *exec.Cmd

binaryCloser sync.Once

stdin, stdout, stderr string

sout, serr io.ReadWriteCloser
soutCloser sync.Once
}

// NewBinaryIO starts a binary logger
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ UpstreamIO, err error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}

var (
sout, serr, w io.ReadWriteCloser
)

bio := &binaryIO{}

stdoutPipe := fmt.Sprintf(`%s\binary-%s-stdout`, pipeRootBinary, id)
sout, err = openNPipe(stdoutPipe)
if err != nil {
return nil, err
}
bio.stdout = stdoutPipe
bio.sout = sout

stderrPipe := fmt.Sprintf(`%s\binary-%s-stderr`, pipeRootBinary, id)
serr, err = openNPipe(stderrPipe)
if err != nil {
return nil, err
}
bio.stderr = stderrPipe
bio.serr = serr

waitPipe := fmt.Sprintf(`%s\binary-%s-wait`, pipeRootBinary, id)
w, err = openNPipe(waitPipe)
if err != nil {
return nil, err
}

defer w.Close()

cmd := newBinaryCmd(ctx, uri, id, ns)
cmd.Env = append(cmd.Env,
"CONTAINER_STDOUT="+stdoutPipe,
"CONTAINER_STDERR="+stderrPipe,
"CONTAINER_WAIT="+waitPipe,
)

bio.cmd = cmd

if err := cmd.Start(); err != nil {
return nil, err
}

b := make([]byte, 1)
if _, err := w.Read(b); err != nil && err != io.EOF {
return nil, errors.Errorf("failed to start binary logger")
}

return bio, nil
}

var _ UpstreamIO = &binaryIO{}

func (b *binaryIO) Close(ctx context.Context) {
b.soutCloser.Do(func() {
if b.sout != nil {
err := b.sout.Close()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while closing stdout npipe")
}
}
if b.serr != nil {
err := b.serr.Close()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while closing stderr npipe")
}
}
})
b.binaryCloser.Do(func() {
done := make(chan error)
go func() {
done <- b.cmd.Wait()
}()

select {
case err := <-done:
if err != nil {
log.G(ctx).WithError(err).Errorf("error while waiting for binary cmd to finish")
}
case <-time.After(binaryCmdWaitTimeout):
log.G(ctx).Errorf("timeout while waiting for binaryIO process to finish. Killing")
err := b.cmd.Process.Kill()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while killing binaryIO process")
}
}
})
}

func (b *binaryIO) CloseStdin(ctx context.Context) {

}

func (b *binaryIO) Stdin() io.Reader {
return nil
}

func (b *binaryIO) StdinPath() string {
return ""
}

func (b *binaryIO) Stdout() io.Writer {
return b.sout
}

func (b *binaryIO) StdoutPath() string {
return b.stdout
}

func (b *binaryIO) Stderr() io.Writer {
return b.serr
}

func (b *binaryIO) StderrPath() string {
return b.stderr
}

func (b *binaryIO) Terminal() bool {
return false
}

type pipe struct {
l net.Listener
con net.Conn
conErr error
conWg sync.WaitGroup
}

func openNPipe(path string) (io.ReadWriteCloser, error) {
l, err := winio.ListenPipe(path, nil)
if err != nil {
return nil, err
}

p := &pipe{l: l}
p.conWg.Add(1)

go func() {
defer p.conWg.Done()
c, err := l.Accept()
if err != nil {
p.conErr = err
return
}
p.con = c
}()
return p, nil
}

func (p *pipe) Write(b []byte) (int, error) {
p.conWg.Wait()
if p.conErr != nil {
return 0, errors.Wrap(p.conErr, "connection error")
}
return p.con.Write(b)
}

func (p *pipe) Read(b []byte) (int, error) {
p.conWg.Wait()
if p.conErr != nil {
return 0, errors.Wrap(p.conErr, "connection error")
}
return p.con.Read(b)
}

func (p *pipe) Close() error {
p.l.Close()
p.conWg.Wait()
if p.con != nil {
err := p.con.Close()
return err
}
return p.conErr
}
72 changes: 72 additions & 0 deletions internal/cmd/io_binary_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package cmd

import (
"context"
"net/url"
"reflect"
"testing"
)

func Test_newBinaryCmd_Key_Value_Pair(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

type config struct {
name string
urlString string
expected string
}

tests := []*config{
{
name: "use-path",
urlString: "binary:///executable?-key=value",
expected: "/executable -key value",
},
{
name: "use-host",
urlString: "binary://executable?-key=value",
expected: "executable -key value",
},
}

for _, cfg := range tests {
t.Run(cfg.name, func(t *testing.T) {
u, err := url.Parse(cfg.urlString)
if err != nil {
t.Fatalf("failed to parse url: %s", cfg.urlString)
}

cmd := newBinaryCmd(ctx, u, "id", "ns")
if cmd.String() != cfg.expected {
t.Fatalf("failed to create cmd. expected: '%s', actual '%s'", cfg.expected, cmd.String())
}
})
}
}

func Test_newBinaryCmd_flags(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

urlString := "schema:///path/to/binary?foo&bar&baz"
uri, _ := url.Parse(urlString)

expectedPath := "/path/to/binary"
expectedFlags := map[string]bool{"foo": true, "bar": true, "baz": true}

cmd := newBinaryCmd(ctx, uri, "id", "ns")

if cmd.Path != expectedPath {
t.Fatalf("invalid cmd path: %s", cmd.Path)
}

actualFlags := map[string]bool{}
for _, f := range cmd.Args[1:] {
actualFlags[f] = true
}

if !reflect.DeepEqual(actualFlags, expectedFlags) {
t.Fatalf("flags missing. expected: %v, actual %v", expectedFlags, actualFlags)
}
}

0 comments on commit 95c0d84

Please sign in to comment.