Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add customer logging support #75

Open
wants to merge 2 commits into
base: windows_port
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/server/container_create_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
}

containerIO, err := cio.NewContainerIO(id,
meta.LogPath,
meta.Config.Labels,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
if err != nil {
return nil, errors.Wrap(err, "failed to create container io")
Expand Down
3 changes: 3 additions & 0 deletions pkg/server/container_create_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
}

containerIO, err := cio.NewContainerIO(id,
meta.LogPath,
meta.Config.Labels,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))

if err != nil {
return nil, errors.Wrap(err, "failed to create container io")
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/server/container_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,17 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain
}

ioCreation := func(id string) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty())
if err != nil {
return nil, errors.Wrap(err, "failed to create container loggers")
if cntr.IO.LoggerSchema == cio.SchemaBinary {
return cntr.IO, nil
} else {
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty())
if err != nil {
return nil, errors.Wrap(err, "failed to create container loggers")
}
cntr.IO.AddOutput("log", stdoutWC, stderrWC)
cntr.IO.Pipe()
return cntr.IO, nil
}
cntr.IO.AddOutput("log", stdoutWC, stderrWC)
cntr.IO.Pipe()
return cntr.IO, nil
}

ctrInfo, err := container.Info(ctx)
Expand Down
67 changes: 58 additions & 9 deletions pkg/server/io/container_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package io

import (
"errors"
"fmt"
"io"
"net/url"
"strings"
"sync"

Expand All @@ -45,8 +47,16 @@ type ContainerIO struct {
stderrGroup *cioutil.WriterGroup

closer *wgCloser

LoggerSchema string
LoggerPath string
}

const (
SchemaFile = "file"
SchemaBinary = "binary"
)

var _ cio.IO = &ContainerIO{}

// ContainerIOOpts sets specific information to newly created ContainerIO.
Expand All @@ -72,7 +82,7 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
}

// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
func NewContainerIO(id string, logPath string, labels map[string]string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
c := &ContainerIO{
id: id,
stdoutGroup: cioutil.NewWriterGroup(),
Expand All @@ -86,13 +96,33 @@ func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err err
if c.fifos == nil {
return nil, errors.New("fifos are not set")
}

c.checkLogPath(logPath)

// Create actual fifos.
stdio, closer, err := newStdioPipes(c.fifos)
if err != nil {
return nil, err
switch c.LoggerSchema {
case SchemaFile:
stdio, closer, err := newStdioPipes(c.fifos)
if err != nil {
return nil, err
}
c.stdioPipes = stdio
c.closer = closer

break
case SchemaBinary:
closer, err := newBinaryLogger(id, c.fifos, c.LoggerPath, labels)
if err != nil {
return nil, err
}

c.closer = closer

break
default:
return nil, errors.New(fmt.Sprintf("unknown scheme %s \n", c.LoggerSchema))
}
c.stdioPipes = stdio
c.closer = closer

return c, nil
}

Expand Down Expand Up @@ -216,19 +246,38 @@ func (c *ContainerIO) AddOutput(name string, stdout, stderr io.WriteCloser) (io.

// Cancel cancels container io.
func (c *ContainerIO) Cancel() {
c.closer.Cancel()
if c.closer != nil {
c.closer.Cancel()
}
}

// Wait waits container io to finish.
func (c *ContainerIO) Wait() {
c.closer.Wait()
if c.closer != nil {
c.closer.Wait()
}
}

// Close closes all FIFOs.
func (c *ContainerIO) Close() error {
c.closer.Close()
if c.closer != nil {
c.closer.Close()
}

if c.fifos != nil {
return c.fifos.Close()
}
return nil
}

func (c *ContainerIO) checkLogPath(logPath string) {
u, err := url.Parse(logPath)
if err != nil || u.Scheme == "" {
c.LoggerSchema = "file"
c.LoggerPath = logPath
return
}

c.LoggerSchema = u.Scheme
c.LoggerPath = u.Opaque
}
4 changes: 4 additions & 0 deletions pkg/server/io/helpers_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
cancel: cancel,
}, nil
}

func newBinaryLogger(_ string, _ *cio.FIFOSet, _ string, , _ map[string]string) (_ *wgCloser, _ error) {
return nil, errors.New("not implemented")
}
116 changes: 116 additions & 0 deletions pkg/server/io/helpers_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,15 @@ limitations under the License.
package io

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"os"
"os/exec"
"sync"
"time"

winio "github.com/Microsoft/go-winio"
"github.com/containerd/containerd/cio"
Expand All @@ -30,6 +36,9 @@ import (
"golang.org/x/net/context"
)

const binaryIOProcStartTimeout = 10 * time.Second
const binaryIOProcTermTimeout = 10 * time.Second // Give logger process solid 10 seconds for cleanup

type delayedConnection struct {
l net.Listener
con net.Conn
Expand Down Expand Up @@ -178,3 +187,110 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
cancel: cancel,
}, nil
}

type binaryCloser struct {
cmd *exec.Cmd
signalFileName string
}

func (this *binaryCloser) Close() error {
if this.cmd == nil || this.cmd.Process == nil {
return nil
}

os.Remove(this.signalFileName)

done := make(chan error, 1)
defer close(done)

go func() {
done <- this.cmd.Wait()
}()

select {
case err := <-done:
return err
case <-time.After(binaryIOProcTermTimeout):
log.L.Warn("failed to wait for customer logger process to exit, killing")

err := this.cmd.Process.Kill()
if err != nil {
return errors.Wrap(err, "failed to kill customer logger process")
}

return nil
}
}

func newBinaryLogger(id string, fifos *cio.FIFOSet, binaryPath string, labels map[string]string) (_ *wgCloser, err error) {
var (
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
)

started := make(chan bool)
defer close(started)

defer func() {
if err != nil {
for _, f := range set {
f.Close()
}
cancel()
}
}()

signalFileName, err := getSignalFileName(id)
if err != nil {
log.L.WithError(err).Errorf("failed to create tempory signal file %s", signalFileName)
return nil, err
}

labelData, err := json.Marshal(labels)
if err != nil {
log.L.WithError(err).Errorf("failed to serialize labels")
return
}

labelStr := string(labelData)
cmd := exec.Command(binaryPath, fifos.Stdout, fifos.Stderr, signalFileName, id, labelStr)

if err := cmd.Start(); err != nil {
return nil, err
}

go func() {
for start := time.Now(); time.Now().Sub(start) < binaryIOProcStartTimeout; {
if _, err := os.Stat(signalFileName); os.IsNotExist(err) {
time.Sleep(time.Second / 2)
} else {
started <- true
return
}
}
started <- false
}()

// Wait until the logger started
if !<-started {
log.L.WithError(err).Errorf("failed to create signal file %s", signalFileName)
return nil, err
}

set = append(set, &binaryCloser{
cmd: cmd,
signalFileName: signalFileName,
})

return &wgCloser{
wg: &sync.WaitGroup{},
set: set,
ctx: ctx,
cancel: cancel,
}, nil
}

func getSignalFileName(id string) (string, error) {
tempdir, err := ioutil.TempDir("", id)
return fmt.Sprintf("%s\\logsignal-%s", tempdir, id), err
}
41 changes: 25 additions & 16 deletions pkg/server/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,28 +184,35 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
err = func() error {
// Load up-to-date status from containerd.
t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) {
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty())
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if stdoutWC != nil {
stdoutWC.Close()
}
if stderrWC != nil {
stderrWC.Close()
}
}
}()
containerIO, err = cio.NewContainerIO(id,
meta.LogPath,
meta.Config.Labels,
cio.WithFIFOs(fifos),
)
if err != nil {
return nil, err
}
containerIO.AddOutput("log", stdoutWC, stderrWC)
containerIO.Pipe()

if containerIO.LoggerSchema == cio.SchemaFile {
stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty())
if err != nil {
return nil, err
}
defer func() {
if err != nil {
if stdoutWC != nil {
stdoutWC.Close()
}
if stderrWC != nil {
stderrWC.Close()
}
}
}()

containerIO.AddOutput("log", stdoutWC, stderrWC)
containerIO.Pipe()
}

return containerIO, nil
})
if err != nil && !errdefs.IsNotFound(err) {
Expand Down Expand Up @@ -236,6 +243,8 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
// containerd got restarted during that. In that case, we still
// treat the container as `CREATED`.
containerIO, err = cio.NewContainerIO(id,
meta.LogPath,
meta.Config.Labels,
cio.WithNewFIFOs(volatileContainerDir, meta.Config.GetTty(), meta.Config.GetStdin()),
)
if err != nil {
Expand Down