Skip to content

Commit

Permalink
Send HTTP Hijack headers after successful attach
Browse files Browse the repository at this point in the history
Our previous flow was to perform a hijack before passing a
connection into Libpod, and then Libpod would attach to the
container's attach socket and begin forwarding traffic.

A problem emerges: we write the attach header as soon as the
attach complete. As soon as we write the header, the client
assumes that all is ready, and sends a Start request. This Start
may be processed *before* we successfully finish attaching,
causing us to lose output.

The solution is to handle hijacking inside Libpod. Unfortunately,
this requires a downright extensive refactor of the Attach and
HTTP Exec StartAndAttach code. I think the result is an
improvement in some places (a lot more errors will be handled
with a proper HTTP error code, before the hijack occurs) but
other parts, like the relocation of printing container logs, are
just *bad*. Still, we need this fixed now to get CI back into
good shape...

Fixes #7195

Signed-off-by: Matthew Heon <[email protected]>
  • Loading branch information
mheon committed Aug 27, 2020
1 parent 7ccd821 commit 2ea9dac
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 188 deletions.
85 changes: 5 additions & 80 deletions libpod/container_api.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
package libpod

import (
"bufio"
"context"
"io/ioutil"
"net"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/containers/podman/v2/libpod/define"
"github.com/containers/podman/v2/libpod/events"
"github.com/containers/podman/v2/libpod/logs"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -267,15 +263,10 @@ func (c *Container) Attach(streams *define.AttachStreams, keys string, resize <-
// over the socket; if this is not set, but streamLogs is, only the logs will be
// sent.
// At least one of streamAttach and streamLogs must be set.
func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, streamAttach, streamLogs bool) (deferredErr error) {
isTerminal := false
if c.config.Spec.Process != nil {
isTerminal = c.config.Spec.Process.Terminal
}
// Ensure our contract of writing errors to and closing the HTTP conn is
// honored.
func (c *Container) HTTPAttach(r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, streamAttach, streamLogs bool, hijackDone chan<- bool) error {
// Ensure we don't leak a goroutine if we exit before hijack completes.
defer func() {
hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
close(hijackDone)
}()

if !c.batched {
Expand All @@ -299,74 +290,8 @@ func (c *Container) HTTPAttach(httpCon net.Conn, httpBuf *bufio.ReadWriter, stre

logrus.Infof("Performing HTTP Hijack attach to container %s", c.ID())

logSize := 0
if streamLogs {
// Get all logs for the container
logChan := make(chan *logs.LogLine)
logOpts := new(logs.LogOptions)
logOpts.Tail = -1
logOpts.WaitGroup = new(sync.WaitGroup)
errChan := make(chan error)
go func() {
var err error
// In non-terminal mode we need to prepend with the
// stream header.
logrus.Debugf("Writing logs for container %s to HTTP attach", c.ID())
for logLine := range logChan {
if !isTerminal {
device := logLine.Device
var header []byte
headerLen := uint32(len(logLine.Msg))
logSize += len(logLine.Msg)
switch strings.ToLower(device) {
case "stdin":
header = makeHTTPAttachHeader(0, headerLen)
case "stdout":
header = makeHTTPAttachHeader(1, headerLen)
case "stderr":
header = makeHTTPAttachHeader(2, headerLen)
default:
logrus.Errorf("Unknown device for log line: %s", device)
header = makeHTTPAttachHeader(1, headerLen)
}
_, err = httpBuf.Write(header)
if err != nil {
break
}
}
_, err = httpBuf.Write([]byte(logLine.Msg))
if err != nil {
break
}
_, err = httpBuf.Write([]byte("\n"))
if err != nil {
break
}
err = httpBuf.Flush()
if err != nil {
break
}
}
errChan <- err
}()
go func() {
logOpts.WaitGroup.Wait()
close(logChan)
}()
if err := c.ReadLog(context.Background(), logOpts, logChan); err != nil {
return err
}
logrus.Debugf("Done reading logs for container %s, %d bytes", c.ID(), logSize)
if err := <-errChan; err != nil {
return err
}
}
if !streamAttach {
return nil
}

c.newContainerEvent(events.Attach)
return c.ociRuntime.HTTPAttach(c, httpCon, httpBuf, streams, detachKeys, cancel)
return c.ociRuntime.HTTPAttach(c, r, w, streams, detachKeys, cancel, hijackDone, streamAttach, streamLogs)
}

// AttachResize resizes the container's terminal, which is displayed by Attach
Expand Down
24 changes: 11 additions & 13 deletions libpod/container_exec.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
package libpod

import (
"bufio"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -373,17 +372,12 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS
}

// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session.
func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) (deferredErr error) {
func (c *Container) ExecHTTPStartAndAttach(sessionID string, r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, hijackDone chan<- bool) error {
// TODO: How do we combine streams with the default streams set in the exec session?

// The flow here is somewhat strange, because we need to determine if
// there's a terminal ASAP (for error handling).
// Until we know, assume it's true (don't add standard stream headers).
// Add a defer to ensure our invariant (HTTP session is closed) is
// maintained.
isTerminal := true
// Ensure that we don't leak a goroutine here
defer func() {
hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
close(hijackDone)
}()

if !c.batched {
Expand All @@ -399,8 +393,6 @@ func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, h
if !ok {
return errors.Wrapf(define.ErrNoSuchExecSession, "container %s has no exec session with ID %s", c.ID(), sessionID)
}
// We can now finally get the real value of isTerminal.
isTerminal = session.Config.Terminal

// Verify that we are in a good state to continue
if !c.ensureState(define.ContainerStateRunning) {
Expand Down Expand Up @@ -432,7 +424,13 @@ func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, h
streams.Stderr = session.Config.AttachStderr
}

pid, attachChan, err := c.ociRuntime.ExecContainerHTTP(c, session.ID(), execOpts, httpCon, httpBuf, streams, cancel)
holdConnOpen := make(chan bool)

defer func() {
close(holdConnOpen)
}()

pid, attachChan, err := c.ociRuntime.ExecContainerHTTP(c, session.ID(), execOpts, r, w, streams, cancel, hijackDone, holdConnOpen)
if err != nil {
session.State = define.ExecStateStopped
session.ExitCode = define.TranslateExecErrorToExitCode(define.ExecErrorCodeGeneric, err)
Expand Down
7 changes: 3 additions & 4 deletions libpod/oci.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package libpod

import (
"bufio"
"net"
"net/http"

"github.com/containers/podman/v2/libpod/define"
"k8s.io/client-go/tools/remotecommand"
Expand Down Expand Up @@ -63,7 +62,7 @@ type OCIRuntime interface {
// used instead. Detach keys of "" will disable detaching via keyboard.
// The streams parameter will determine which streams to forward to the
// client.
HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) error
HTTPAttach(ctr *Container, r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool, hijackDone chan<- bool, streamAttach, streamLogs bool) error
// AttachResize resizes the terminal in use by the given container.
AttachResize(ctr *Container, newSize remotecommand.TerminalSize) error

Expand All @@ -80,7 +79,7 @@ type OCIRuntime interface {
// Maintains the same invariants as ExecContainer (returns on session
// start, with a goroutine running in the background to handle attach).
// The HTTP attach itself maintains the same invariants as HTTPAttach.
ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error)
ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, cancel <-chan bool, hijackDone chan<- bool, holdConnOpen <-chan bool) (int, chan error, error)
// ExecContainerDetached executes a command in a running container, but
// does not attach to it. Returns the PID of the exec session and an
// error (if starting the exec session failed)
Expand Down
39 changes: 35 additions & 4 deletions libpod/oci_conmon_exec_linux.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package libpod

import (
"bufio"
"fmt"
"net"
"net/http"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -80,7 +80,7 @@ func (r *ConmonOCIRuntime) ExecContainer(c *Container, sessionID string, options

// ExecContainerHTTP executes a new command in an existing container and
// forwards its standard streams over an attach
func (r *ConmonOCIRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error) {
func (r *ConmonOCIRuntime) ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, req *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, cancel <-chan bool, hijackDone chan<- bool, holdConnOpen <-chan bool) (int, chan error, error) {
if streams != nil {
if !streams.Stdin && !streams.Stdout && !streams.Stderr {
return -1, nil, errors.Wrapf(define.ErrInvalidArg, "must provide at least one stream to attach to")
Expand Down Expand Up @@ -129,7 +129,7 @@ func (r *ConmonOCIRuntime) ExecContainerHTTP(ctr *Container, sessionID string, o
attachChan := make(chan error)
go func() {
// attachToExec is responsible for closing pipes
attachChan <- attachExecHTTP(ctr, sessionID, httpBuf, streams, pipes, detachKeys, options.Terminal, cancel)
attachChan <- attachExecHTTP(ctr, sessionID, req, w, streams, pipes, detachKeys, options.Terminal, cancel, hijackDone, holdConnOpen)
close(attachChan)
}()

Expand Down Expand Up @@ -496,7 +496,7 @@ func (r *ConmonOCIRuntime) startExec(c *Container, sessionID string, options *Ex
}

// Attach to a container over HTTP
func attachExecHTTP(c *Container, sessionID string, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, pipes *execPipes, detachKeys []byte, isTerminal bool, cancel <-chan bool) error {
func attachExecHTTP(c *Container, sessionID string, r *http.Request, w http.ResponseWriter, streams *HTTPAttachStreams, pipes *execPipes, detachKeys []byte, isTerminal bool, cancel <-chan bool, hijackDone chan<- bool, holdConnOpen <-chan bool) (deferredErr error) {
if pipes == nil || pipes.startPipe == nil || pipes.attachPipe == nil {
return errors.Wrapf(define.ErrInvalidArg, "must provide a start and attach pipe to finish an exec attach")
}
Expand Down Expand Up @@ -549,6 +549,37 @@ func attachExecHTTP(c *Container, sessionID string, httpBuf *bufio.ReadWriter, s
attachStdin = streams.Stdin
}

// Perform hijack
hijacker, ok := w.(http.Hijacker)
if !ok {
return errors.Errorf("unable to hijack connection")
}

httpCon, httpBuf, err := hijacker.Hijack()
if err != nil {
return errors.Wrapf(err, "error hijacking connection")
}

hijackDone <- true

// Write a header to let the client know what happened
writeHijackHeader(r, httpBuf)

// Force a flush after the header is written.
if err := httpBuf.Flush(); err != nil {
return errors.Wrapf(err, "error flushing HTTP hijack header")
}

go func() {
// We need to hold the connection open until the complete exec
// function has finished. This channel will be closed in a defer
// in that function, so we can wait for it here.
// Can't be a defer, because this would block the function from
// returning.
<-holdConnOpen
hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
}()

// Next, STDIN. Avoid entirely if attachStdin unset.
if attachStdin {
go func() {
Expand Down
Loading

0 comments on commit 2ea9dac

Please sign in to comment.