From d3c2186ac8b46300050c18d3adff5a39aaacfb7b Mon Sep 17 00:00:00 2001 From: Thomas Weber Date: Fri, 2 Apr 2021 19:22:40 +0200 Subject: [PATCH] Exec support using libpod api (#87) --- api/exec_create.go | 85 +++++++++++++++++ api/exec_inspect.go | 36 ++++++++ api/exec_resize.go | 25 +++++ api/exec_start.go | 218 ++++++++++++++++++++++++++++++++++++++++++++ api/structs.go | 2 +- driver.go | 113 ++++++++++++++++++++++- go.mod | 1 + 7 files changed, 476 insertions(+), 4 deletions(-) create mode 100644 api/exec_create.go create mode 100644 api/exec_inspect.go create mode 100644 api/exec_resize.go create mode 100644 api/exec_start.go diff --git a/api/exec_create.go b/api/exec_create.go new file mode 100644 index 00000000..1e5e3847 --- /dev/null +++ b/api/exec_create.go @@ -0,0 +1,85 @@ +package api + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" +) + +// ExecConfig contains the configuration of an exec session +type ExecConfig struct { + // Command the the command that will be invoked in the exec session. + // Must not be empty. + Command []string `json:"Cmd"` + // DetachKeys are keys that will be used to detach from the exec + // session. + DetachKeys string `json:"DetachKeys,omitempty"` + // Environment is a set of environment variables that will be set for + // the first process started by the exec session. + Environment map[string]string `json:"Env,omitempty"` + // The user, and optionally, group to run the exec process inside the container. + // Format is one of: user, user:group, uid, or uid:gid." + User string `json:"User,omitempty"` + // WorkDir is the working directory for the first process that will be + // launched by the exec session. + // If set to "" the exec session will be started in / within the + // container. + WorkDir string `json:"WorkingDir,omitempty"` + // Tty is whether the exec session will allocate a pseudoterminal. + Tty bool `json:"Tty,omitempty"` + // AttachStdin is whether the STDIN stream will be forwarded to the exec + // session's first process when attaching. Only available if Terminal is + // false. + AttachStdin bool `json:"AttachStdin,omitempty"` + // AttachStdout is whether the STDOUT stream will be forwarded to the + // exec session's first process when attaching. Only available if + // Terminal is false. + AttachStdout bool `json:"AttachStdout,omitempty"` + // AttachStderr is whether the STDERR stream will be forwarded to the + // exec session's first process when attaching. Only available if + // Terminal is false. + AttachStderr bool `json:"AttachStderr,omitempty"` + // Privileged is whether the exec session will be privileged - that is, + // will be granted additional capabilities. + Privileged bool `json:"Privileged,omitempty"` +} + +// ExecSessionResponse contains the ID of a newly created exec session +type ExecSessionResponse struct { + ID string +} + +// ExecCreate creates an exec session to run a command inside a running container +func (c *API) ExecCreate(ctx context.Context, name string, config ExecConfig) (string, error) { + + jsonString, err := json.Marshal(config) + if err != nil { + return "", err + } + + res, err := c.Post(ctx, fmt.Sprintf("/v1.0.0/libpod/containers/%s/exec", name), bytes.NewBuffer(jsonString)) + if err != nil { + return "", err + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusCreated { + body, _ := ioutil.ReadAll(res.Body) + return "", fmt.Errorf("unknown error, status code: %d: %s", res.StatusCode, body) + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return "", err + } + execResponse := &ExecSessionResponse{} + err = json.Unmarshal(body, execResponse) + if err != nil { + return "", err + } + return execResponse.ID, err +} diff --git a/api/exec_inspect.go b/api/exec_inspect.go new file mode 100644 index 00000000..0aef415a --- /dev/null +++ b/api/exec_inspect.go @@ -0,0 +1,36 @@ +package api + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" +) + +// ExecInspect returns low-level information about an exec instance. +func (c *API) ExecInspect(ctx context.Context, sessionId string) (InspectExecSession, error) { + + var inspectData InspectExecSession + + res, err := c.Get(ctx, fmt.Sprintf("/v1.0.0/libpod/exec/%s/json", sessionId)) + if err != nil { + return inspectData, err + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return inspectData, fmt.Errorf("unknown error, status code: %d", res.StatusCode) + } + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return inspectData, err + } + err = json.Unmarshal(body, &inspectData) + if err != nil { + return inspectData, err + } + + return inspectData, nil +} diff --git a/api/exec_resize.go b/api/exec_resize.go new file mode 100644 index 00000000..aefd3a62 --- /dev/null +++ b/api/exec_resize.go @@ -0,0 +1,25 @@ +package api + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" +) + +func (c *API) ExecResize(ctx context.Context, execId string, height int, width int) error { + + res, err := c.Post(ctx, fmt.Sprintf("/v1.0.0/libpod/exec/%s/resize?h=%d&w=%d", execId, height, width), nil) + if err != nil { + return err + } + + defer res.Body.Close() + + if res.StatusCode != http.StatusCreated { + body, _ := ioutil.ReadAll(res.Body) + return fmt.Errorf("unknown error, status code: %d: %s", res.StatusCode, body) + } + + return err +} diff --git a/api/exec_start.go b/api/exec_start.go new file mode 100644 index 00000000..289341d6 --- /dev/null +++ b/api/exec_start.go @@ -0,0 +1,218 @@ +package api + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "io" + "net" + "net/http" + "time" + + "github.com/hashicorp/nomad/plugins/drivers" +) + +// ExecStartRequest prepares to stream a exec session +type ExecStartRequest struct { + + // streams + Stdin io.Reader + Stdout io.Writer + Stderr io.Writer + + // terminal size channel + ResizeCh <-chan drivers.TerminalSize + + // Tty indicates whether pseudo-terminal is to be allocated + Tty bool + + // AttachOutput is whether to attach to STDOUT + // If false, stdout will not be attached + AttachOutput bool + // AttachError is whether to attach to STDERR + // If false, stdout will not be attached + AttachError bool + // AttachInput is whether to attach to STDIN + // If false, stdout will not be attached + AttachInput bool +} + +// DemuxHeader reads header for stream from server multiplexed stdin/stdout/stderr/2nd error channel +func DemuxHeader(r io.Reader, buffer []byte) (fd, sz int, err error) { + n, err := io.ReadFull(r, buffer[0:8]) + if err != nil { + return + } + if n < 8 { + err = io.ErrUnexpectedEOF + return + } + + fd = int(buffer[0]) + if fd < 0 || fd > 3 { + err = fmt.Errorf(`channel "%d" found, 0-3 supported`, fd) + return + } + + sz = int(binary.BigEndian.Uint32(buffer[4:8])) + return +} + +// DemuxFrame reads contents for frame from server multiplexed stdin/stdout/stderr/2nd error channel +func DemuxFrame(r io.Reader, buffer []byte, length int) (frame []byte, err error) { + if len(buffer) < length { + buffer = append(buffer, make([]byte, length-len(buffer)+1)...) + } + + n, err := io.ReadFull(r, buffer[0:length]) + if err != nil { + return nil, nil + } + if n < length { + err = io.ErrUnexpectedEOF + return + } + + return buffer[0:length], nil +} + +// This is intended to be run as a goroutine, handling resizing for a container +// or exec session. +func (c *API) attachHandleResize(ctx context.Context, resizeChannel <-chan drivers.TerminalSize, sessionId string) { + for { + select { + case <-ctx.Done(): + c.logger.Trace("Resize handler is done") + return + case size := <-resizeChannel: + c.logger.Trace("Resize terminal", "sessionId", sessionId, "height", size.Height, "width", size.Width) + rerr := c.ExecResize(ctx, sessionId, size.Height, size.Width) + if rerr != nil { + c.logger.Error("Failed to resize TTY", "err", rerr) + } + } + } +} + +// ExecStartAndAttach starts and attaches to a given exec session. +func (c *API) ExecStart(ctx context.Context, sessionID string, options ExecStartRequest) error { + client := new(http.Client) + *client = *c.httpClient + client.Timeout = 0 + + var socket net.Conn + socketSet := false + dialContext := client.Transport.(*http.Transport).DialContext + t := &http.Transport{ + DialContext: func(ctx context.Context, network, address string) (net.Conn, error) { + c, err := dialContext(ctx, network, address) + if err != nil { + return nil, err + } + if !socketSet { + socket = c + socketSet = true + } + return c, err + }, + IdleConnTimeout: time.Duration(0), + } + client.Transport = t + + // Detach is always false. + // podman reference doc states that "true" is not supported + execStartReq := struct { + Detach bool `json:"Detach"` + }{ + Detach: false, + } + jsonBytes, err := json.Marshal(execStartReq) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/exec/%s/start", c.baseUrl, sessionID), bytes.NewBuffer(jsonBytes)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + res, err := client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK { + return err + } + + if options.Tty { + go c.attachHandleResize(ctx, options.ResizeCh, sessionID) + } + + if options.AttachInput { + go func() { + _, err := io.Copy(socket, options.Stdin) + if err != nil { + c.logger.Error("Failed to send stdin to exec session", "err", err) + } + }() + } + + defer func() { + c.logger.Debug("Finish exec session attach") + }() + + buffer := make([]byte, 1024) + if options.Tty { + // If not multiplex'ed, read from server and write to stdout + _, err := io.Copy(options.Stdout, socket) + if err != nil { + return err + } + } else { + for { + // Read multiplexed channels and write to appropriate stream + fd, l, err := DemuxHeader(socket, buffer) + if err != nil { + if errors.Is(err, io.EOF) { + return nil + } + return err + } + frame, err := DemuxFrame(socket, buffer, l) + if err != nil { + return err + } + + switch { + case fd == 0: + // Write STDIN to STDOUT (echoing characters + // typed by another attach session) + if options.AttachInput { + if _, err := options.Stdout.Write(frame[0:l]); err != nil { + return err + } + } + case fd == 1: + if options.AttachOutput { + if _, err := options.Stdout.Write(frame[0:l]); err != nil { + return err + } + } + case fd == 2: + if options.AttachError { + if _, err := options.Stderr.Write(frame[0:l]); err != nil { + return err + } + } + case fd == 3: + return fmt.Errorf("error from service from stream: %s", frame) + default: + return fmt.Errorf("unrecognized channel '%d' in header, 0-3 supported", fd) + } + } + } + return nil +} diff --git a/api/structs.go b/api/structs.go index fc1d3314..f145d8dd 100644 --- a/api/structs.go +++ b/api/structs.go @@ -1193,7 +1193,7 @@ type InspectContainerData struct { // InspectExecSession contains information about a given exec session. type InspectExecSession struct { // ProcessConfig contains information about the exec session's process. - ProcessConfig *InspectExecProcess `json:"ProcessConfig"` + // ProcessConfig *InspectExecProcess `json:"ProcessConfig"` // ContainerID is the ID of the container this exec session is attached // to. ContainerID string `json:"ContainerID"` diff --git a/driver.go b/driver.go index e4a96617..ced97181 100644 --- a/driver.go +++ b/driver.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/armon/circbuf" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/go-hclog" @@ -54,7 +55,7 @@ var ( // optional features this driver supports capabilities = &drivers.Capabilities{ SendSignals: true, - Exec: false, + Exec: true, FSIsolation: drivers.FSIsolationImage, NetIsolationModes: []drivers.NetIsolationMode{ drivers.NetIsolationModeGroup, @@ -753,9 +754,115 @@ func (d *Driver) SignalTask(taskID string, signal string) error { return d.podman.ContainerKill(d.ctx, handle.containerID, signal) } -// ExecTask function is used by the Nomad client to execute commands inside the task execution context. +// ExecTask function is used by the Nomad client to execute scripted health checks inside the task execution context. func (d *Driver) ExecTask(taskID string, cmd []string, timeout time.Duration) (*drivers.ExecTaskResult, error) { - return nil, fmt.Errorf("Podman driver does not support exec") + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + createRequest := api.ExecConfig{ + Command: cmd, + Tty: false, + AttachStdin: false, + AttachStdout: true, + AttachStderr: true, + } + ctx, cancel := context.WithTimeout(d.ctx, timeout) + defer cancel() + sessionId, err := d.podman.ExecCreate(ctx, handle.containerID, createRequest) + if err != nil { + d.logger.Error("Unable to create ExecTask session", "err", err) + return nil, err + } + stdout, err := circbuf.NewBuffer(int64(drivers.CheckBufSize)) + if err != nil { + d.logger.Error("ExecTask session failed, unable to allocate stdout buffer", "sessionId", sessionId, "err", err) + return nil, err + } + stderr, err := circbuf.NewBuffer(int64(drivers.CheckBufSize)) + if err != nil { + d.logger.Error("ExecTask session failed, unable to allocate stderr buffer", "sessionId", sessionId, "err", err) + return nil, err + } + startRequest := api.ExecStartRequest{ + Tty: false, + AttachInput: false, + AttachOutput: true, + Stdout: stdout, + AttachError: true, + Stderr: stderr, + } + err = d.podman.ExecStart(ctx, sessionId, startRequest) + if err != nil { + d.logger.Error("ExecTask session returned with error", "sessionId", sessionId, "err", err) + return nil, err + } + + inspectData, err := d.podman.ExecInspect(ctx, sessionId) + if err != nil { + d.logger.Error("Unable to inspect finished ExecTask session", "sessionId", sessionId, "err", err) + return nil, err + } + execResult := &drivers.ExecTaskResult{ + ExitResult: &drivers.ExitResult{ + ExitCode: inspectData.ExitCode, + }, + Stdout: stdout.Bytes(), + Stderr: stderr.Bytes(), + } + d.logger.Trace("ExecTask result", "code", execResult.ExitResult.ExitCode, "out", string(execResult.Stdout), "err", string(execResult.Stderr)) + + return execResult, nil +} + +// ExecTask function is used by the Nomad client to execute commands inside the task execution context. +// i.E. nomad alloc exec .... +func (d *Driver) ExecTaskStreaming(ctx context.Context, taskID string, execOptions *drivers.ExecOptions) (*drivers.ExitResult, error) { + handle, ok := d.tasks.Get(taskID) + if !ok { + return nil, drivers.ErrTaskNotFound + } + + createRequest := api.ExecConfig{ + Command: execOptions.Command, + Tty: execOptions.Tty, + AttachStdin: true, + AttachStdout: true, + AttachStderr: true, + } + + sessionId, err := d.podman.ExecCreate(ctx, handle.containerID, createRequest) + if err != nil { + d.logger.Error("Unable to create exec session", "err", err) + return nil, err + } + + startRequest := api.ExecStartRequest{ + Tty: execOptions.Tty, + AttachInput: createRequest.AttachStdin, + Stdin: execOptions.Stdin, + AttachOutput: createRequest.AttachStdout, + Stdout: execOptions.Stdout, + AttachError: createRequest.AttachStderr, + Stderr: execOptions.Stderr, + ResizeCh: execOptions.ResizeCh, + } + err = d.podman.ExecStart(ctx, sessionId, startRequest) + if err != nil { + d.logger.Error("Exec session returned with error", "sessionId", sessionId, "err", err) + return nil, err + } + + inspectData, err := d.podman.ExecInspect(ctx, sessionId) + if err != nil { + d.logger.Error("Unable to inspect finished exec session", "sessionId", sessionId, "err", err) + return nil, err + } + exitResult := drivers.ExitResult{ + ExitCode: inspectData.ExitCode, + Err: err, + } + return &exitResult, nil } func (d *Driver) containerMounts(task *drivers.TaskConfig, driverConfig *TaskConfig) ([]spec.Mount, error) { diff --git a/go.mod b/go.mod index f7ac34a1..bee24596 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ replace ( require ( github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect + github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e github.com/container-storage-interface/spec v1.2.0 // indirect github.com/containernetworking/plugins v0.8.5 // indirect github.com/distribution/distribution v2.7.1+incompatible // indirect