Skip to content

Commit

Permalink
Add support for logging binary (#896)
Browse files Browse the repository at this point in the history
Logging binary support and integration tests

Signed-off-by: Maksim An <[email protected]>
  • Loading branch information
anmaxvl authored Jan 7, 2021
1 parent bf55dad commit fd21b8d
Show file tree
Hide file tree
Showing 10 changed files with 678 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*.exe
.idea
.vscode
2 changes: 2 additions & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ build_script:
- go test -c ./cri-containerd/ -tags functional
- go test -c ./functional/ -tags functional
- go test -c ./runhcs/ -tags functional
- go build -o sample-logging-driver.exe ./cri-containerd/helpers/log.go

artifacts:
- path: 'containerd-shim-runhcs-v1.exe'
Expand All @@ -43,3 +44,4 @@ artifacts:
- path: './test/cri-containerd.test.exe'
- path: './test/functional.test.exe'
- path: './test/runhcs.test.exe'
- path: './test/sample-logging-driver.exe'
41 changes: 28 additions & 13 deletions cmd/containerd-shim-runhcs-v1/task_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ import (
"sync"
"time"

eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/typeurl"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"

"github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
runhcsopts "github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/options"
"github.com/Microsoft/hcsshim/cmd/containerd-shim-runhcs-v1/stats"
Expand All @@ -23,15 +33,6 @@ import (
"github.com/Microsoft/hcsshim/internal/shimdiag"
"github.com/Microsoft/hcsshim/internal/uvm"
"github.com/Microsoft/hcsshim/osversion"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/typeurl"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

func newHcsStandaloneTask(ctx context.Context, events publisher, req *task.CreateTaskRequest, s *specs.Spec) (shimTask, error) {
Expand Down Expand Up @@ -124,7 +125,7 @@ func newHcsTask(

owner := filepath.Base(os.Args[0])

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
io, err := cmd.NewUpstreamIO(ctx, req.ID, req.Stdout, req.Stderr, req.Stdin, req.Terminal)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -181,7 +182,8 @@ func newHcsTask(
req.Bundle,
ht.isWCOW,
s.Process,
io)
io,
)

if parent != nil {
// We have a parent UVM. Listen for its exit and forcibly close this
Expand Down Expand Up @@ -288,11 +290,24 @@ func (ht *hcsTask) CreateExec(ctx context.Context, req *task.ExecProcessRequest,
return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '' in task: '%s' must be running to create additional execs", ht.id)
}

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
io, err := cmd.NewUpstreamIO(ctx, req.ID, req.Stdout, req.Stderr, req.Stdin, req.Terminal)
if err != nil {
return err
}
he := newHcsExec(ctx, ht.events, ht.id, ht.host, ht.c, req.ExecID, ht.init.Status().Bundle, ht.isWCOW, spec, io)

he := newHcsExec(
ctx,
ht.events,
ht.id,
ht.host,
ht.c,
req.ExecID,
ht.init.Status().Bundle,
ht.isWCOW,
spec,
io,
)

ht.execs.Store(req.ExecID, he)

// Publish the created event
Expand Down
22 changes: 22 additions & 0 deletions internal/cmd/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package cmd
import (
"context"
"io"
"net/url"

"github.com/pkg/errors"
)

// UpstreamIO is an interface describing the IO to connect to above the shim.
Expand Down Expand Up @@ -36,3 +39,22 @@ type UpstreamIO interface {
// return `""`.
Terminal() bool
}

// NewUpstreamIO returns an UpstreamIO instance. Currently we only support named pipes and binary
// logging driver for container IO. When using binary logger `stdout` and `stderr` are assumed to be
// the same and the value of `stderr` is completely ignored.
func NewUpstreamIO(ctx context.Context, id string, stdout string, stderr string, stdin string, terminal bool) (UpstreamIO, error) {
u, err := url.Parse(stdout)

// Create IO with named pipes.
if err != nil || u.Scheme == "" {
return NewNpipeIO(ctx, stdin, stdout, stderr, terminal)
}

// Create IO for binary logging driver.
if u.Scheme != "binary" {
return nil, errors.Errorf("scheme must be 'binary', got: '%s'", u.Scheme)
}

return NewBinaryIO(ctx, id, u)
}
266 changes: 266 additions & 0 deletions internal/cmd/io_binary.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
package cmd

import (
"context"
"fmt"
"io"
"net"
"net/url"
"os/exec"
"sync"
"time"

"github.com/Microsoft/go-winio"
"github.com/containerd/containerd/namespaces"
"github.com/pkg/errors"

"github.com/Microsoft/hcsshim/internal/log"
)

const (
binaryPipeFmt = `\\.\pipe\binary-%s-%s`
binaryCmdWaitTimeout = 10 * time.Second
binaryCmdStartTimeout = 10 * time.Second
)

// NewBinaryIO runs a custom binary process for pluggable shim logging driver.
//
// Container's IO will be redirected to the logging driver via named pipes, which are
// passed as "CONTAINER_STDOUT", "CONTAINER_STDERR" environment variables. The logging
// driver MUST dial a wait pipe passed via "CONTAINER_WAIT" environment variable AND CLOSE
// it to indicate that it's ready to consume the IO. For customer's convenience container ID
// and namespace are also passed via "CONTAINER_ID" and "CONTAINER_NAMESPACE".
//
// The path to the logging driver can be provided via a URL's host/path. Additional arguments
// can be passed to the logger via URL query params
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ UpstreamIO, err error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
ns = namespaces.Default
}

var stdoutPipe, stderrPipe, waitPipe io.ReadWriteCloser

stdoutPipePath := fmt.Sprintf(binaryPipeFmt, id, "stdout")
stdoutPipe, err = openNPipe(stdoutPipePath)
if err != nil {
return nil, err
}

stderrPipePath := fmt.Sprintf(binaryPipeFmt, id, "stderr")
stderrPipe, err = openNPipe(stderrPipePath)
if err != nil {
return nil, err
}

waitPipePath := fmt.Sprintf(binaryPipeFmt, id, "wait")
waitPipe, err = openNPipe(waitPipePath)
if err != nil {
return nil, err
}
defer waitPipe.Close()

envs := []string{
"CONTAINER_ID=" + id,
"CONTAINER_NAMESPACE=" + ns,
"CONTAINER_STDOUT=" + stdoutPipePath,
"CONTAINER_STDERR=" + stderrPipePath,
"CONTAINER_WAIT=" + waitPipePath,
}
cmd, err := newBinaryCmd(ctx, uri, envs)
if err != nil {
return nil, err
}

if err := cmd.Start(); err != nil {
return nil, err
}

errCh := make(chan error, 1)
// Wait for logging driver to signal to the wait pipe that it's ready to consume IO
go func() {
b := make([]byte, 1)
if _, err := waitPipe.Read(b); err != nil && err != io.EOF {
errCh <- err
return
}
errCh <- nil
}()

select {
case err = <-errCh:
if err != nil {
return nil, errors.Wrap(err, "failed to start binary logger")
}
case <-time.After(binaryCmdStartTimeout):
return nil, errors.New("failed to start binary logger: timeout")
}

return &binaryIO{
cmd: cmd,
stdout: stdoutPipePath,
sout: stdoutPipe,
stderr: stderrPipePath,
serr: stderrPipe,
}, nil
}

func newBinaryCmd(ctx context.Context, uri *url.URL, envs []string) (*exec.Cmd, error) {
if uri.Host == "" && uri.Path == "" {
return nil, errors.New("no logging driver path provided")
}

var args []string
for k, vs := range uri.Query() {
args = append(args, k)
if len(vs) > 0 && vs[0] != "" {
args = append(args, vs[0])
}
}

execPath := uri.Path
// Absolute path is required, treat "binary://path/to/binary" and "binary:///path/to/binary"
// as the same.
if uri.Host != "" {
execPath = "/" + uri.Host + uri.Path
}

cmd := exec.CommandContext(ctx, execPath, args...)
cmd.Env = append(cmd.Env, envs...)

return cmd, nil
}

var _ UpstreamIO = &binaryIO{}

// Implements UpstreamIO interface to enable shim pluggable logging
type binaryIO struct {
cmd *exec.Cmd

binaryCloser sync.Once

stdin, stdout, stderr string

sout, serr io.ReadWriteCloser
soutCloser sync.Once
}

// Close named pipes for container stdout and stderr and wait for the binary process to finish.
func (b *binaryIO) Close(ctx context.Context) {
b.soutCloser.Do(func() {
if b.sout != nil {
err := b.sout.Close()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while closing stdout npipe")
}
}
if b.serr != nil {
err := b.serr.Close()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while closing stderr npipe")
}
}
})
b.binaryCloser.Do(func() {
done := make(chan error)
go func() {
done <- b.cmd.Wait()
}()

select {
case err := <-done:
if err != nil {
log.G(ctx).WithError(err).Errorf("error while waiting for binary cmd to finish")
}
case <-time.After(binaryCmdWaitTimeout):
log.G(ctx).Errorf("timeout while waiting for binaryIO process to finish. Killing")
err := b.cmd.Process.Kill()
if err != nil {
log.G(ctx).WithError(err).Errorf("error while killing binaryIO process")
}
}
})
}

func (b *binaryIO) CloseStdin(ctx context.Context) {}

func (b *binaryIO) Stdin() io.Reader {
return nil
}

func (b *binaryIO) StdinPath() string {
return ""
}

func (b *binaryIO) Stdout() io.Writer {
return b.sout
}

func (b *binaryIO) StdoutPath() string {
return b.stdout
}

func (b *binaryIO) Stderr() io.Writer {
return b.serr
}

func (b *binaryIO) StderrPath() string {
return b.stderr
}

func (b *binaryIO) Terminal() bool {
return false
}

type pipe struct {
l net.Listener
con net.Conn
conErr error
conWg sync.WaitGroup
}

func openNPipe(path string) (io.ReadWriteCloser, error) {
l, err := winio.ListenPipe(path, nil)
if err != nil {
return nil, err
}

p := &pipe{l: l}
p.conWg.Add(1)

go func() {
defer p.conWg.Done()
c, err := l.Accept()
if err != nil {
p.conErr = err
return
}
p.con = c
}()
return p, nil
}

func (p *pipe) Write(b []byte) (int, error) {
p.conWg.Wait()
if p.conErr != nil {
return 0, errors.Wrap(p.conErr, "connection error")
}
return p.con.Write(b)
}

func (p *pipe) Read(b []byte) (int, error) {
p.conWg.Wait()
if p.conErr != nil {
return 0, errors.Wrap(p.conErr, "connection error")
}
return p.con.Read(b)
}

func (p *pipe) Close() error {
p.l.Close()
p.conWg.Wait()
if p.con != nil {
return p.con.Close()
}
return p.conErr
}
Loading

0 comments on commit fd21b8d

Please sign in to comment.