diff --git a/pkg/server/container_create_unix.go b/pkg/server/container_create_unix.go index 99cc53e7a..a3aa60b35 100644 --- a/pkg/server/container_create_unix.go +++ b/pkg/server/container_create_unix.go @@ -197,6 +197,8 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta } containerIO, err := cio.NewContainerIO(id, + meta.LogPath, + meta.Config.Labels, cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin())) if err != nil { return nil, errors.Wrap(err, "failed to create container io") diff --git a/pkg/server/container_create_windows.go b/pkg/server/container_create_windows.go index 20f4d8ad9..5fd1b64ef 100644 --- a/pkg/server/container_create_windows.go +++ b/pkg/server/container_create_windows.go @@ -189,7 +189,10 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta } containerIO, err := cio.NewContainerIO(id, + meta.LogPath, + meta.Config.Labels, cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin())) + if err != nil { return nil, errors.Wrap(err, "failed to create container io") } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index b10334d56..cd34bcdd7 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -83,13 +83,17 @@ func (c *criService) StartContainer(ctx context.Context, r *runtime.StartContain } ioCreation := func(id string) (_ containerdio.IO, err error) { - stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty()) - if err != nil { - return nil, errors.Wrap(err, "failed to create container loggers") + if cntr.IO.LoggerSchema == cio.SchemaBinary { + return cntr.IO, nil + } else { + stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, config.GetTty()) + if err != nil { + return nil, errors.Wrap(err, "failed to create container loggers") + } + cntr.IO.AddOutput("log", stdoutWC, stderrWC) + cntr.IO.Pipe() + return cntr.IO, nil } - cntr.IO.AddOutput("log", stdoutWC, stderrWC) - cntr.IO.Pipe() - return cntr.IO, nil } ctrInfo, err := container.Info(ctx) diff --git a/pkg/server/io/container_io.go b/pkg/server/io/container_io.go index 7edf627c6..417b04f7d 100644 --- a/pkg/server/io/container_io.go +++ b/pkg/server/io/container_io.go @@ -18,7 +18,9 @@ package io import ( "errors" + "fmt" "io" + "net/url" "strings" "sync" @@ -45,8 +47,16 @@ type ContainerIO struct { stderrGroup *cioutil.WriterGroup closer *wgCloser + + LoggerSchema string + LoggerPath string } +const ( + SchemaFile = "file" + SchemaBinary = "binary" +) + var _ cio.IO = &ContainerIO{} // ContainerIOOpts sets specific information to newly created ContainerIO. @@ -72,7 +82,7 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts { } // NewContainerIO creates container io. -func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) { +func NewContainerIO(id string, logPath string, labels map[string]string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) { c := &ContainerIO{ id: id, stdoutGroup: cioutil.NewWriterGroup(), @@ -86,13 +96,33 @@ func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err err if c.fifos == nil { return nil, errors.New("fifos are not set") } + + c.checkLogPath(logPath) + // Create actual fifos. - stdio, closer, err := newStdioPipes(c.fifos) - if err != nil { - return nil, err + switch c.LoggerSchema { + case SchemaFile: + stdio, closer, err := newStdioPipes(c.fifos) + if err != nil { + return nil, err + } + c.stdioPipes = stdio + c.closer = closer + + break + case SchemaBinary: + closer, err := newBinaryLogger(id, c.fifos, c.LoggerPath, labels) + if err != nil { + return nil, err + } + + c.closer = closer + + break + default: + return nil, errors.New(fmt.Sprintf("unknown scheme %s \n", c.LoggerSchema)) } - c.stdioPipes = stdio - c.closer = closer + return c, nil } @@ -216,19 +246,38 @@ func (c *ContainerIO) AddOutput(name string, stdout, stderr io.WriteCloser) (io. // Cancel cancels container io. func (c *ContainerIO) Cancel() { - c.closer.Cancel() + if c.closer != nil { + c.closer.Cancel() + } } // Wait waits container io to finish. func (c *ContainerIO) Wait() { - c.closer.Wait() + if c.closer != nil { + c.closer.Wait() + } } // Close closes all FIFOs. func (c *ContainerIO) Close() error { - c.closer.Close() + if c.closer != nil { + c.closer.Close() + } + if c.fifos != nil { return c.fifos.Close() } return nil } + +func (c *ContainerIO) checkLogPath(logPath string) { + u, err := url.Parse(logPath) + if err != nil || u.Scheme == "" { + c.LoggerSchema = "file" + c.LoggerPath = logPath + return + } + + c.LoggerSchema = u.Scheme + c.LoggerPath = u.Opaque +} diff --git a/pkg/server/io/helpers_unix.go b/pkg/server/io/helpers_unix.go index fed5aa461..257df57fc 100644 --- a/pkg/server/io/helpers_unix.go +++ b/pkg/server/io/helpers_unix.go @@ -72,3 +72,7 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) { cancel: cancel, }, nil } + +func newBinaryLogger(_ string, _ *cio.FIFOSet, _ string, , _ map[string]string) (_ *wgCloser, _ error) { + return nil, errors.New("not implemented") +} diff --git a/pkg/server/io/helpers_windows.go b/pkg/server/io/helpers_windows.go index baf392509..1960b61a0 100644 --- a/pkg/server/io/helpers_windows.go +++ b/pkg/server/io/helpers_windows.go @@ -19,9 +19,15 @@ limitations under the License. package io import ( + "encoding/json" + "fmt" "io" + "io/ioutil" "net" + "os" + "os/exec" "sync" + "time" winio "github.com/Microsoft/go-winio" "github.com/containerd/containerd/cio" @@ -30,6 +36,9 @@ import ( "golang.org/x/net/context" ) +const binaryIOProcStartTimeout = 10 * time.Second +const binaryIOProcTermTimeout = 10 * time.Second // Give logger process solid 10 seconds for cleanup + type delayedConnection struct { l net.Listener con net.Conn @@ -178,3 +187,110 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) { cancel: cancel, }, nil } + +type binaryCloser struct { + cmd *exec.Cmd + signalFileName string +} + +func (this *binaryCloser) Close() error { + if this.cmd == nil || this.cmd.Process == nil { + return nil + } + + os.Remove(this.signalFileName) + + done := make(chan error, 1) + defer close(done) + + go func() { + done <- this.cmd.Wait() + }() + + select { + case err := <-done: + return err + case <-time.After(binaryIOProcTermTimeout): + log.L.Warn("failed to wait for customer logger process to exit, killing") + + err := this.cmd.Process.Kill() + if err != nil { + return errors.Wrap(err, "failed to kill customer logger process") + } + + return nil + } +} + +func newBinaryLogger(id string, fifos *cio.FIFOSet, binaryPath string, labels map[string]string) (_ *wgCloser, err error) { + var ( + set []io.Closer + ctx, cancel = context.WithCancel(context.Background()) + ) + + started := make(chan bool) + defer close(started) + + defer func() { + if err != nil { + for _, f := range set { + f.Close() + } + cancel() + } + }() + + signalFileName, err := getSignalFileName(id) + if err != nil { + log.L.WithError(err).Errorf("failed to create tempory signal file %s", signalFileName) + return nil, err + } + + labelData, err := json.Marshal(labels) + if err != nil { + log.L.WithError(err).Errorf("failed to serialize labels") + return + } + + labelStr := string(labelData) + cmd := exec.Command(binaryPath, fifos.Stdout, fifos.Stderr, signalFileName, id, labelStr) + + if err := cmd.Start(); err != nil { + return nil, err + } + + go func() { + for start := time.Now(); time.Now().Sub(start) < binaryIOProcStartTimeout; { + if _, err := os.Stat(signalFileName); os.IsNotExist(err) { + time.Sleep(time.Second / 2) + } else { + started <- true + return + } + } + started <- false + }() + + // Wait until the logger started + if !<-started { + log.L.WithError(err).Errorf("failed to create signal file %s", signalFileName) + return nil, err + } + + set = append(set, &binaryCloser{ + cmd: cmd, + signalFileName: signalFileName, + }) + + return &wgCloser{ + wg: &sync.WaitGroup{}, + set: set, + ctx: ctx, + cancel: cancel, + }, nil +} + +func getSignalFileName(id string) (string, error) { + tempdir, err := ioutil.TempDir("", id) + return fmt.Sprintf("%s\\logsignal-%s", tempdir, id), err +} diff --git a/pkg/server/restart.go b/pkg/server/restart.go index 30b1fa625..c8d4fba24 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -184,28 +184,35 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe err = func() error { // Load up-to-date status from containerd. t, err := cntr.Task(ctx, func(fifos *containerdio.FIFOSet) (_ containerdio.IO, err error) { - stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty()) - if err != nil { - return nil, err - } - defer func() { - if err != nil { - if stdoutWC != nil { - stdoutWC.Close() - } - if stderrWC != nil { - stderrWC.Close() - } - } - }() containerIO, err = cio.NewContainerIO(id, + meta.LogPath, + meta.Config.Labels, cio.WithFIFOs(fifos), ) if err != nil { return nil, err } - containerIO.AddOutput("log", stdoutWC, stderrWC) - containerIO.Pipe() + + if containerIO.LoggerSchema == cio.SchemaFile { + stdoutWC, stderrWC, err := c.createContainerLoggers(meta.LogPath, meta.Config.GetTty()) + if err != nil { + return nil, err + } + defer func() { + if err != nil { + if stdoutWC != nil { + stdoutWC.Close() + } + if stderrWC != nil { + stderrWC.Close() + } + } + }() + + containerIO.AddOutput("log", stdoutWC, stderrWC) + containerIO.Pipe() + } + return containerIO, nil }) if err != nil && !errdefs.IsNotFound(err) { @@ -236,6 +243,8 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe // containerd got restarted during that. In that case, we still // treat the container as `CREATED`. containerIO, err = cio.NewContainerIO(id, + meta.LogPath, + meta.Config.Labels, cio.WithNewFIFOs(volatileContainerDir, meta.Config.GetTty(), meta.Config.GetStdin()), ) if err != nil {