Skip to content

Commit

Permalink
Merge pull request #5831 from mheon/exec_http_attach
Browse files Browse the repository at this point in the history
APIv2 ExecStart (Attached Only)
  • Loading branch information
openshift-merge-robot authored May 15, 2020
2 parents c61a45c + 6d1e5c7 commit 343ab99
Show file tree
Hide file tree
Showing 9 changed files with 653 additions and 194 deletions.
227 changes: 179 additions & 48 deletions libpod/container_exec.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package libpod

import (
"bufio"
"io/ioutil"
"net"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -102,7 +104,7 @@ func (e *ExecSession) Inspect() (*define.InspectExecSession, error) {
}

output := new(define.InspectExecSession)
output.CanRemove = e.State != define.ExecStateRunning
output.CanRemove = e.State == define.ExecStateStopped
output.ContainerID = e.ContainerId
if e.Config.DetachKeys != nil {
output.DetachKeys = *e.Config.DetachKeys
Expand Down Expand Up @@ -156,9 +158,6 @@ func (c *Container) ExecCreate(config *ExecConfig) (string, error) {
if len(config.Command) == 0 {
return "", errors.Wrapf(define.ErrInvalidArg, "must provide a non-empty command to start an exec session")
}
if config.Terminal && (config.AttachStdin || config.AttachStdout || config.AttachStderr) {
return "", errors.Wrapf(define.ErrInvalidArg, "cannot specify streams to attach to when exec session has a pseudoterminal")
}

// Verify that we are in a good state to continue
if !c.ensureState(define.ContainerStateRunning) {
Expand Down Expand Up @@ -247,34 +246,12 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS

logrus.Infof("Going to start container %s exec session %s and attach to it", c.ID(), session.ID())

// TODO: check logic here - should we set Privileged if the container is
// privileged?
var capList []string
if session.Config.Privileged || c.config.Privileged {
capList = capabilities.AllCapabilities()
}

user := c.config.User
if session.Config.User != "" {
user = session.Config.User
}

if err := c.createExecBundle(session.ID()); err != nil {
opts, err := prepareForExec(c, session)
if err != nil {
return err
}

opts := new(ExecOptions)
opts.Cmd = session.Config.Command
opts.CapAdd = capList
opts.Env = session.Config.Environment
opts.Terminal = session.Config.Terminal
opts.Cwd = session.Config.WorkDir
opts.User = user
opts.Streams = streams
opts.PreserveFDs = session.Config.PreserveFDs
opts.DetachKeys = session.Config.DetachKeys

pid, attachChan, err := c.ociRuntime.ExecContainer(c, session.ID(), opts)
pid, attachChan, err := c.ociRuntime.ExecContainer(c, session.ID(), opts, streams)
if err != nil {
return err
}
Expand Down Expand Up @@ -318,28 +295,124 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS
c.lock.Lock()
}

// Sync the container to pick up state changes
if err := c.syncContainer(); err != nil {
if err := writeExecExitCode(c, session.ID(), exitCode); err != nil {
if lastErr != nil {
logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
}
return errors.Wrapf(err, "error syncing container %s state to remove exec session %s", c.ID(), session.ID())
lastErr = err
}

// Update status
// Since we did a syncContainer, the old session has been overwritten.
// Grab a fresh one from the database.
session, ok = c.state.ExecSessions[sessionID]
// Clean up after ourselves
if err := c.cleanupExecBundle(session.ID()); err != nil {
if lastErr != nil {
logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
}
lastErr = err
}

return lastErr
}

// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session.
func (c *Container) ExecHTTPStartAndAttach(sessionID string, httpCon net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) (deferredErr error) {
// TODO: How do we combine streams with the default streams set in the exec session?

// The flow here is somewhat strange, because we need to determine if
// there's a terminal ASAP (for error handling).
// Until we know, assume it's true (don't add standard stream headers).
// Add a defer to ensure our invariant (HTTP session is closed) is
// maintained.
isTerminal := true
defer func() {
hijackWriteErrorAndClose(deferredErr, c.ID(), isTerminal, httpCon, httpBuf)
}()

if !c.batched {
c.lock.Lock()
defer c.lock.Unlock()

if err := c.syncContainer(); err != nil {
return err
}
}

session, ok := c.state.ExecSessions[sessionID]
if !ok {
// Exec session already removed.
logrus.Infof("Container %s exec session %s already removed from database", c.ID(), sessionID)
return nil
return errors.Wrapf(define.ErrNoSuchExecSession, "container %s has no exec session with ID %s", c.ID(), sessionID)
}
session.State = define.ExecStateStopped
session.ExitCode = exitCode
session.PID = 0
// We can now finally get the real value of isTerminal.
isTerminal = session.Config.Terminal

// Verify that we are in a good state to continue
if !c.ensureState(define.ContainerStateRunning) {
return errors.Wrapf(define.ErrCtrStateInvalid, "can only start exec sessions when their container is running")
}

if session.State != define.ExecStateCreated {
return errors.Wrapf(define.ErrExecSessionStateInvalid, "can only start created exec sessions, while container %s session %s state is %q", c.ID(), session.ID(), session.State.String())
}

logrus.Infof("Going to start container %s exec session %s and attach to it", c.ID(), session.ID())

execOpts, err := prepareForExec(c, session)
if err != nil {
return err
}

if streams == nil {
streams = new(HTTPAttachStreams)
streams.Stdin = session.Config.AttachStdin
streams.Stdout = session.Config.AttachStdout
streams.Stderr = session.Config.AttachStderr
}

pid, attachChan, err := c.ociRuntime.ExecContainerHTTP(c, session.ID(), execOpts, httpCon, httpBuf, streams, cancel)
if err != nil {
return err
}

// TODO: Investigate whether more of this can be made common with
// ExecStartAndAttach

c.newContainerEvent(events.Exec)
logrus.Debugf("Successfully started exec session %s in container %s", session.ID(), c.ID())

var lastErr error

session.PID = pid
session.State = define.ExecStateRunning

if err := c.save(); err != nil {
lastErr = err
}

// Unlock so other processes can use the container
if !c.batched {
c.lock.Unlock()
}

tmpErr := <-attachChan
if lastErr != nil {
logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
}
lastErr = tmpErr

exitCode, err := c.readExecExitCode(session.ID())
if err != nil {
if lastErr != nil {
logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
}
lastErr = err
}

logrus.Debugf("Container %s exec session %s completed with exit code %d", c.ID(), session.ID(), exitCode)

// Lock again
if !c.batched {
c.lock.Lock()
}

if err := writeExecExitCode(c, session.ID(), exitCode); err != nil {
if lastErr != nil {
logrus.Errorf("Container %s exec session %s error: %v", c.ID(), session.ID(), lastErr)
}
Expand All @@ -357,12 +430,6 @@ func (c *Container) ExecStartAndAttach(sessionID string, streams *define.AttachS
return lastErr
}

// ExecHTTPStartAndAttach starts and performs an HTTP attach to an exec session.
func (c *Container) ExecHTTPStartAndAttach(sessionID string) error {
// Will be implemented in part 2, migrating Start.
return define.ErrNotImplemented
}

// ExecStop stops an exec session in the container.
// If a timeout is provided, it will be used; otherwise, the timeout will
// default to the stop timeout of the container.
Expand Down Expand Up @@ -814,3 +881,67 @@ func (c *Container) removeAllExecSessions() error {

return lastErr
}

// Make an ExecOptions struct to start the OCI runtime and prepare its exec
// bundle.
func prepareForExec(c *Container, session *ExecSession) (*ExecOptions, error) {
// TODO: check logic here - should we set Privileged if the container is
// privileged?
var capList []string
if session.Config.Privileged || c.config.Privileged {
capList = capabilities.AllCapabilities()
}

user := c.config.User
if session.Config.User != "" {
user = session.Config.User
}

if err := c.createExecBundle(session.ID()); err != nil {
return nil, err
}

opts := new(ExecOptions)
opts.Cmd = session.Config.Command
opts.CapAdd = capList
opts.Env = session.Config.Environment
opts.Terminal = session.Config.Terminal
opts.Cwd = session.Config.WorkDir
opts.User = user
opts.PreserveFDs = session.Config.PreserveFDs
opts.DetachKeys = session.Config.DetachKeys

return opts, nil
}

// Write an exec session's exit code to the database
func writeExecExitCode(c *Container, sessionID string, exitCode int) error {
// We can't reuse the old exec session (things may have changed from
// under use, the container was unlocked).
// So re-sync and get a fresh copy.
// If we can't do this, no point in continuing, any attempt to save
// would write garbage to the DB.
if err := c.syncContainer(); err != nil {
if errors.Cause(err) == define.ErrNoSuchCtr || errors.Cause(err) == define.ErrCtrRemoved {
// Container's entirely removed. We can't save status,
// but the container's entirely removed, so we don't
// need to. Exit without error.
return nil
}
return errors.Wrapf(err, "error syncing container %s state to remove exec session %s", c.ID(), sessionID)
}

session, ok := c.state.ExecSessions[sessionID]
if !ok {
// Exec session already removed.
logrus.Infof("Container %s exec session %s already removed from database", c.ID(), sessionID)
return nil
}

session.State = define.ExecStateStopped
session.ExitCode = exitCode
session.PID = 0

// Finally, save our changes.
return c.save()
}
8 changes: 8 additions & 0 deletions libpod/container_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,6 +1011,14 @@ func (c *Container) init(ctx context.Context, retainRetries bool) error {

logrus.Debugf("Created container %s in OCI runtime", c.ID())

// Remove any exec sessions leftover from a potential prior run.
if len(c.state.ExecSessions) > 0 {
if err := c.runtime.state.RemoveContainerExecSessions(c); err != nil {
logrus.Errorf("Error removing container %s exec sessions from DB: %v", c.ID(), err)
}
c.state.ExecSessions = make(map[string]*ExecSession)
}

c.state.ExitCode = 0
c.state.Exited = false
c.state.State = define.ContainerStateCreated
Expand Down
15 changes: 12 additions & 3 deletions libpod/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ type OCIRuntime interface {
// the attach session to be terminated if provided via the STDIN
// channel. If they are not provided, the default detach keys will be
// used instead. Detach keys of "" will disable detaching via keyboard.
// The streams parameter may be passed for containers that did not
// create a terminal and will determine which streams to forward to the
// The streams parameter will determine which streams to forward to the
// client.
HTTPAttach(ctr *Container, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, detachKeys *string, cancel <-chan bool) error
// AttachResize resizes the terminal in use by the given container.
Expand All @@ -71,7 +70,17 @@ type OCIRuntime interface {
// ExecContainer executes a command in a running container.
// Returns an int (exit code), error channel (errors from attach), and
// error (errors that occurred attempting to start the exec session).
ExecContainer(ctr *Container, sessionID string, options *ExecOptions) (int, chan error, error)
// This returns once the exec session is running - not once it has
// completed, as one might expect. The attach session will remain
// running, in a goroutine that will return via the chan error in the
// return signature.
ExecContainer(ctr *Container, sessionID string, options *ExecOptions, streams *define.AttachStreams) (int, chan error, error)
// ExecContainerHTTP executes a command in a running container and
// attaches its standard streams to a provided hijacked HTTP session.
// Maintains the same invariants as ExecContainer (returns on session
// start, with a goroutine running in the background to handle attach).
// The HTTP attach itself maintains the same invariants as HTTPAttach.
ExecContainerHTTP(ctr *Container, sessionID string, options *ExecOptions, httpConn net.Conn, httpBuf *bufio.ReadWriter, streams *HTTPAttachStreams, cancel <-chan bool) (int, chan error, error)
// ExecAttachResize resizes the terminal of a running exec session. Only
// allowed with sessions that were created with a TTY.
ExecAttachResize(ctr *Container, sessionID string, newSize remotecommand.TerminalSize) error
Expand Down
Loading

0 comments on commit 343ab99

Please sign in to comment.