Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exec support using libpod api #87

Merged
merged 3 commits into from
Apr 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions api/exec_create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package api

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)

// ExecConfig contains the configuration of an exec session
type ExecConfig struct {
// Command the the command that will be invoked in the exec session.
// Must not be empty.
Command []string `json:"Cmd"`
// DetachKeys are keys that will be used to detach from the exec
// session.
DetachKeys string `json:"DetachKeys,omitempty"`
// Environment is a set of environment variables that will be set for
// the first process started by the exec session.
Environment map[string]string `json:"Env,omitempty"`
// The user, and optionally, group to run the exec process inside the container.
// Format is one of: user, user:group, uid, or uid:gid."
User string `json:"User,omitempty"`
// WorkDir is the working directory for the first process that will be
// launched by the exec session.
// If set to "" the exec session will be started in / within the
// container.
WorkDir string `json:"WorkingDir,omitempty"`
// Tty is whether the exec session will allocate a pseudoterminal.
Tty bool `json:"Tty,omitempty"`
// AttachStdin is whether the STDIN stream will be forwarded to the exec
// session's first process when attaching. Only available if Terminal is
// false.
AttachStdin bool `json:"AttachStdin,omitempty"`
// AttachStdout is whether the STDOUT stream will be forwarded to the
// exec session's first process when attaching. Only available if
// Terminal is false.
AttachStdout bool `json:"AttachStdout,omitempty"`
// AttachStderr is whether the STDERR stream will be forwarded to the
// exec session's first process when attaching. Only available if
// Terminal is false.
AttachStderr bool `json:"AttachStderr,omitempty"`
// Privileged is whether the exec session will be privileged - that is,
// will be granted additional capabilities.
Privileged bool `json:"Privileged,omitempty"`
}

// ExecSessionResponse contains the ID of a newly created exec session
type ExecSessionResponse struct {
ID string
}

// ExecCreate creates an exec session to run a command inside a running container
func (c *API) ExecCreate(ctx context.Context, name string, config ExecConfig) (string, error) {

jsonString, err := json.Marshal(config)
if err != nil {
return "", err
}

res, err := c.Post(ctx, fmt.Sprintf("/v1.0.0/libpod/containers/%s/exec", name), bytes.NewBuffer(jsonString))
if err != nil {
return "", err
}

defer res.Body.Close()

if res.StatusCode != http.StatusCreated {
body, _ := ioutil.ReadAll(res.Body)
return "", fmt.Errorf("unknown error, status code: %d: %s", res.StatusCode, body)
}

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return "", err
}
execResponse := &ExecSessionResponse{}
err = json.Unmarshal(body, execResponse)
if err != nil {
return "", err
}
return execResponse.ID, err
}
36 changes: 36 additions & 0 deletions api/exec_inspect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package api

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)

// ExecInspect returns low-level information about an exec instance.
func (c *API) ExecInspect(ctx context.Context, sessionId string) (InspectExecSession, error) {

var inspectData InspectExecSession

res, err := c.Get(ctx, fmt.Sprintf("/v1.0.0/libpod/exec/%s/json", sessionId))
if err != nil {
return inspectData, err
}

defer res.Body.Close()

if res.StatusCode != http.StatusOK {
return inspectData, fmt.Errorf("unknown error, status code: %d", res.StatusCode)
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return inspectData, err
}
err = json.Unmarshal(body, &inspectData)
if err != nil {
return inspectData, err
}

return inspectData, nil
}
25 changes: 25 additions & 0 deletions api/exec_resize.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package api

import (
"context"
"fmt"
"io/ioutil"
"net/http"
)

func (c *API) ExecResize(ctx context.Context, execId string, height int, width int) error {

res, err := c.Post(ctx, fmt.Sprintf("/v1.0.0/libpod/exec/%s/resize?h=%d&w=%d", execId, height, width), nil)
if err != nil {
return err
}

defer res.Body.Close()

if res.StatusCode != http.StatusCreated {
body, _ := ioutil.ReadAll(res.Body)
return fmt.Errorf("unknown error, status code: %d: %s", res.StatusCode, body)
}

return err
}
218 changes: 218 additions & 0 deletions api/exec_start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package api

import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"time"

"github.com/hashicorp/nomad/plugins/drivers"
)

// ExecStartRequest prepares to stream a exec session
type ExecStartRequest struct {

// streams
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer

// terminal size channel
ResizeCh <-chan drivers.TerminalSize

// Tty indicates whether pseudo-terminal is to be allocated
Tty bool

// AttachOutput is whether to attach to STDOUT
// If false, stdout will not be attached
AttachOutput bool
// AttachError is whether to attach to STDERR
// If false, stdout will not be attached
AttachError bool
// AttachInput is whether to attach to STDIN
// If false, stdout will not be attached
AttachInput bool
}

// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel
func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) {
n, err := io.ReadFull(r, buffer[0:8])
if err != nil {
return
}
if n < 8 {
err = io.ErrUnexpectedEOF
return
}

fd = int(buffer[0])
if fd < 0 || fd > 3 {
err = fmt.Errorf(`channel "%d" found, 0-3 supported`, fd)
return
}

sz = int(binary.BigEndian.Uint32(buffer[4:8]))
return
}

// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel
func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) {
if len(buffer) < length {
buffer = append(buffer, make([]byte, length-len(buffer)+1)...)
}

n, err := io.ReadFull(r, buffer[0:length])
if err != nil {
return nil, nil
}
if n < length {
err = io.ErrUnexpectedEOF
return
}

return buffer[0:length], nil
}

// This is intended to be run as a goroutine, handling resizing for a container
// or exec session.
func (c *API) attachHandleResize(ctx context.Context, resizeChannel <-chan drivers.TerminalSize, sessionId string) {
for {
select {
case <-ctx.Done():
c.logger.Warn("Resize handler is done")
Copy link
Contributor

@drewbailey drewbailey Mar 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if an operator would find this something that they need to act upon or be notified about, Trace seems better

Suggested change
c.logger.Warn("Resize handler is done")
c.logger.Trace("Resize handler is done")

return
case size := <-resizeChannel:
c.logger.Trace("Resize terminal", "sessionId", sessionId, "height", size.Height, "width", size.Width)
rerr := c.ExecResize(ctx, sessionId, size.Height, size.Width)
if rerr != nil {
c.logger.Warn("failed to resize TTY", "err", rerr)
towe75 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

// ExecStartAndAttach starts and attaches to a given exec session.
func (c *API) ExecStart(ctx context.Context, sessionID string, options ExecStartRequest) error {
client := new(http.Client)
*client = *c.httpClient
client.Timeout = 0

var socket net.Conn
socketSet := false
dialContext := client.Transport.(*http.Transport).DialContext
t := &http.Transport{
DialContext: func(ctx context.Context, network, address string) (net.Conn, error) {
c, err := dialContext(ctx, network, address)
if err != nil {
return nil, err
}
if !socketSet {
socket = c
socketSet = true
}
return c, err
},
IdleConnTimeout: time.Duration(0),
}
client.Transport = t

// Detach is always false.
// podman reference doc states that "true" is not supported
execStartReq := struct {
Detach bool `json:"Detach"`
}{
Detach: false,
}
jsonBytes, err := json.Marshal(execStartReq)
if err != nil {
return err
}
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/exec/%s/start", c.baseUrl, sessionID), bytes.NewBuffer(jsonBytes))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return err
}

if options.Tty {
go c.attachHandleResize(ctx, options.ResizeCh, sessionID)
}

if options.AttachInput {
go func() {
_, err := io.Copy(socket, options.Stdin)
if err != nil {
c.logger.Error("Failed to send stdin to exec session", "err", err)
}
}()
}

defer func() {
c.logger.Debug("Finish exec session attach")
}()

buffer := make([]byte, 1024)
if options.Tty {
// If not multiplex'ed, read from server and write to stdout
_, err := io.Copy(options.Stdout, socket)
if err != nil {
return err
}
} else {
towe75 marked this conversation as resolved.
Show resolved Hide resolved
for {
// Read multiplexed channels and write to appropriate stream
fd, l, err := DemuxHeader(socket, buffer)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
return err
}
frame, err := DemuxFrame(socket, buffer, l)
if err != nil {
return err
}

switch {
case fd == 0:
// Write STDIN to STDOUT (echoing characters
// typed by another attach session)
if options.AttachInput {
if _, err := options.Stdout.Write(frame[0:l]); err != nil {
return err
}
}
case fd == 1:
if options.AttachOutput {
if _, err := options.Stdout.Write(frame[0:l]); err != nil {
return err
}
}
case fd == 2:
if options.AttachError {
if _, err := options.Stderr.Write(frame[0:l]); err != nil {
return err
}
}
case fd == 3:
return fmt.Errorf("error from service from stream: %s", frame)
default:
return fmt.Errorf("unrecognized channel '%d' in header, 0-3 supported", fd)
}
}
}
return nil
}
2 changes: 1 addition & 1 deletion api/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ type InspectContainerData struct {
// InspectExecSession contains information about a given exec session.
type InspectExecSession struct {
// ProcessConfig contains information about the exec session's process.
ProcessConfig *InspectExecProcess `json:"ProcessConfig"`
// ProcessConfig *InspectExecProcess `json:"ProcessConfig"`
// ContainerID is the ID of the container this exec session is attached
// to.
ContainerID string `json:"ContainerID"`
Expand Down
Loading