Skip to content

Commit

Permalink
Add support for logging binary
Browse files Browse the repository at this point in the history
Signed-off-by: Maksim An <[email protected]>
  • Loading branch information
anmaxvl committed Nov 18, 2020
1 parent d672bc1 commit eeff4aa
Show file tree
Hide file tree
Showing 4 changed files with 386 additions and 4 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*.exe
.idea
.vscode
40 changes: 36 additions & 4 deletions cmd/containerd-shim-runhcs-v1/task_hcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -124,7 +125,16 @@ func newHcsTask(

owner := filepath.Base(os.Args[0])

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
var io cmd.UpstreamIO

u, err := url.Parse(req.Stdout)

if err != nil || u.Scheme != "binary" {
io, err = cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
} else {
io, err = cmd.NewBinaryIO(ctx, req.ID, u)
}

if err != nil {
return nil, err
}
Expand Down Expand Up @@ -177,7 +187,8 @@ func newHcsTask(
req.Bundle,
ht.isWCOW,
s.Process,
io)
io,
)

if parent != nil {
// We have a parent UVM. Listen for its exit and forcibly close this
Expand Down Expand Up @@ -284,11 +295,32 @@ func (ht *hcsTask) CreateExec(ctx context.Context, req *task.ExecProcessRequest,
return errors.Wrapf(errdefs.ErrFailedPrecondition, "exec: '' in task: '%s' must be running to create additional execs", ht.id)
}

io, err := cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
var io cmd.UpstreamIO

u, err := url.Parse(req.Stdout)

if err != nil || u.Scheme != "binary" {
io, err = cmd.NewNpipeIO(ctx, req.Stdin, req.Stdout, req.Stderr, req.Terminal)
} else {
io, err = cmd.NewBinaryIO(ctx, req.ID, u)
}

if err != nil {
return err
}
he := newHcsExec(ctx, ht.events, ht.id, ht.host, ht.c, req.ExecID, ht.init.Status().Bundle, ht.isWCOW, spec, io)
he := newHcsExec(
ctx,
ht.events,
ht.id,
ht.host,
ht.c,
req.ExecID,
ht.init.Status().Bundle,
ht.isWCOW,
spec,
io,
)

ht.execs.Store(req.ExecID, he)

// Publish the created event
Expand Down
94 changes: 94 additions & 0 deletions cmd/logging/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package logging

import (
"context"
"fmt"
"io"
"net"
"os"

"github.com/Microsoft/go-winio"
)

// Config is passed to the binary logging function
type Config struct {
ID string
Namespace string
Stdout io.Reader
Stderr io.Reader
}

// LoggerFunc is a binary logging function signature
type LoggerFunc func(context.Context, *Config, func() error) error

// Run runs LoggerFunc
func Run(fn LoggerFunc) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
sout, serr net.Conn
ready func() error
)

if soutEnv := os.Getenv("CONTAINER_STDOUT"); soutEnv != "" {
locOut, err := winio.DialPipeContext(ctx, soutEnv)

if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
sout = locOut
}

if serrEnv := os.Getenv("CONTAINER_STDERR"); serrEnv != "" {
locSerr, err := winio.DialPipeContext(ctx, serrEnv)

if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
serr = locSerr
}

waitEnv := os.Getenv("CONTAINER_WAIT")
if waitEnv != "" {
wait, err := winio.DialPipeContext(ctx, waitEnv)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}

// Write to wait pipe
ready = func() error {
wait.Write([]byte("#"))
return wait.Close()
}
} else {
// Noop ready func
ready = func() error { return nil }
}

config := &Config{
ID: os.Getenv("CONTAINER_ID"),
Namespace: os.Getenv("CONTAINER_NAMESPACE"),
Stdout: sout,
Stderr: serr,
}

var errCh = make(chan error, 0)

go func() {
if err := fn(ctx, config, ready); err != nil {
errCh <- err
return
}
errCh <- nil
}()

err := <-errCh
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
Loading

0 comments on commit eeff4aa

Please sign in to comment.