Skip to content

Commit

Permalink
Implement top streaming for containers and pods
Browse files Browse the repository at this point in the history
* Implement API query parameter stream and delay for containers and
  pods top endpoints
* Update swagger with breaking changes
* Add python API tests for endpoints

Fixes #12115

Signed-off-by: Jhon Honce <[email protected]>
  • Loading branch information
jwhonce committed Nov 2, 2021
1 parent 3147ff8 commit 449cc7a
Show file tree
Hide file tree
Showing 6 changed files with 237 additions and 65 deletions.
81 changes: 62 additions & 19 deletions pkg/api/handlers/compat/containers_top.go
Original file line number Diff line number Diff line change
@@ -1,63 +1,106 @@
package compat

import (
"encoding/json"
"fmt"
"net/http"
"strings"
"time"

"github.com/containers/podman/v3/libpod"
"github.com/containers/podman/v3/pkg/api/handlers"
"github.com/containers/podman/v3/pkg/api/handlers/utils"
api "github.com/containers/podman/v3/pkg/api/types"
"github.com/gorilla/schema"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

func TopContainer(w http.ResponseWriter, r *http.Request) {
runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime)
decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder)

defaultValue := "-ef"
psArgs := "-ef"
if utils.IsLibpodRequest(r) {
defaultValue = ""
psArgs = ""
}
query := struct {
Delay int `schema:"delay"`
PsArgs string `schema:"ps_args"`
Stream bool `schema:"stream"`
}{
PsArgs: defaultValue,
Delay: 5,
PsArgs: psArgs,
}
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
errors.Wrapf(err, "failed to parse parameters for %s", r.URL.String()))
return
}

if query.Delay < 1 {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
fmt.Errorf("\"delay\" parameter of value %d < 1", query.Delay))
return
}

name := utils.GetName(r)
c, err := runtime.LookupContainer(name)
if err != nil {
utils.ContainerNotFound(w, name, err)
return
}

output, err := c.Top([]string{query.PsArgs})
if err != nil {
utils.InternalServerError(w, err)
return
// We are committed now - all errors logged but not reported to client, ship has sailed
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
if f, ok := w.(http.Flusher); ok {
f.Flush()
}

var body = handlers.ContainerTopOKBody{}
if len(output) > 0 {
body.Titles = strings.Split(output[0], "\t")
for i := range body.Titles {
body.Titles[i] = strings.TrimSpace(body.Titles[i])
}
encoder := json.NewEncoder(w)

loop: // break out of for/select infinite` loop
for {
select {
case <-r.Context().Done():
break loop
default:
output, err := c.Top([]string{query.PsArgs})
if err != nil {
logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
break loop
}

if len(output) > 0 {
body := handlers.ContainerTopOKBody{}
body.Titles = strings.Split(output[0], "\t")
for i := range body.Titles {
body.Titles[i] = strings.TrimSpace(body.Titles[i])
}

for _, line := range output[1:] {
process := strings.Split(line, "\t")
for i := range process {
process[i] = strings.TrimSpace(process[i])
}
body.Processes = append(body.Processes, process)
}

if err := encoder.Encode(body); err != nil {
logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
break loop
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}

for _, line := range output[1:] {
process := strings.Split(line, "\t")
for i := range process {
process[i] = strings.TrimSpace(process[i])
if query.Stream {
time.Sleep(time.Duration(query.Delay) * time.Second)
} else {
break loop
}
body.Processes = append(body.Processes, process)
}
}
utils.WriteJSON(w, http.StatusOK, body)
}
82 changes: 65 additions & 17 deletions pkg/api/handlers/libpod/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"strings"
"time"

"github.com/containers/common/pkg/config"
"github.com/containers/podman/v3/libpod"
Expand Down Expand Up @@ -363,42 +364,89 @@ func PodTop(w http.ResponseWriter, r *http.Request) {
runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime)
decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder)

psArgs := "-ef"
if utils.IsLibpodRequest(r) {
psArgs = ""
}
query := struct {
Delay int `schema:"delay"`
PsArgs string `schema:"ps_args"`
Stream bool `schema:"stream"`
}{
PsArgs: "",
Delay: 5,
PsArgs: psArgs,
}
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
errors.Wrapf(err, "failed to parse parameters for %s", r.URL.String()))
return
}

name := utils.GetName(r)
pod, err := runtime.LookupPod(name)
if err != nil {
utils.PodNotFound(w, name, err)
if query.Delay < 1 {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
fmt.Errorf("\"delay\" parameter of value %d < 1", query.Delay))
return
}

args := []string{}
if query.PsArgs != "" {
args = append(args, query.PsArgs)
}
output, err := pod.GetPodPidInformation(args)
name := utils.GetName(r)
pod, err := runtime.LookupPod(name)
if err != nil {
utils.InternalServerError(w, err)
utils.PodNotFound(w, name, err)
return
}

var body = handlers.PodTopOKBody{}
if len(output) > 0 {
body.Titles = strings.Split(output[0], "\t")
for _, line := range output[1:] {
body.Processes = append(body.Processes, strings.Split(line, "\t"))
// We are committed now - all errors logged but not reported to client, ship has sailed
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
if f, ok := w.(http.Flusher); ok {
f.Flush()
}

encoder := json.NewEncoder(w)

loop: // break out of for/select infinite` loop
for {
select {
case <-r.Context().Done():
break loop
default:
output, err := pod.GetPodPidInformation([]string{query.PsArgs})
if err != nil {
logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
break loop
}

if len(output) > 0 {
var body = handlers.PodTopOKBody{}
body.Titles = strings.Split(output[0], "\t")
for i := range body.Titles {
body.Titles[i] = strings.TrimSpace(body.Titles[i])
}

for _, line := range output[1:] {
process := strings.Split(line, "\t")
for i := range process {
process[i] = strings.TrimSpace(process[i])
}
body.Processes = append(body.Processes, process)
}

if err := encoder.Encode(body); err != nil {
logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
break loop
}
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}

if query.Stream {
time.Sleep(time.Duration(query.Delay) * time.Second)
} else {
break loop
}
}
}
utils.WriteJSON(w, http.StatusOK, body)
}

func PodKill(w http.ResponseWriter, r *http.Request) {
Expand Down
17 changes: 11 additions & 6 deletions pkg/api/server/register_containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// - in: query
// name: ps_args
// type: string
// default: -ef
// description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// produces:
// - application/json
Expand Down Expand Up @@ -1142,19 +1143,23 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// name: name
// type: string
// required: true
// description: |
// Name of container to query for processes
// (As of version 1.xx)
// description: Name of container to query for processes (As of version 1.xx)
// - in: query
// name: stream
// type: boolean
// default: true
// description: Stream the output
// description: when true, repeatedly stream the latest output (As of version 4.0)
// - in: query
// name: delay
// type: integer
// description: if streaming, delay in seconds between updates. Must be >1. (As of version 4.0)
// default: 5
// - in: query
// name: ps_args
// type: string
// default: -ef
// description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// description: |
// arguments to pass to ps such as aux.
// Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// produces:
// - application/json
// responses:
Expand Down
15 changes: 10 additions & 5 deletions pkg/api/server/register_pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,18 +296,23 @@ func (s *APIServer) registerPodsHandlers(r *mux.Router) error {
// name: name
// type: string
// required: true
// description: |
// Name of pod to query for processes
// description: Name of pod to query for processes
// - in: query
// name: stream
// type: boolean
// default: true
// description: Stream the output
// description: when true, repeatedly stream the latest output (As of version 4.0)
// - in: query
// name: delay
// type: integer
// description: if streaming, delay in seconds between updates. Must be >1. (As of version 4.0)
// default: 5
// - in: query
// name: ps_args
// type: string
// default: -ef
// description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// description: |
// arguments to pass to ps such as aux.
// Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// responses:
// 200:
// $ref: "#/responses/DocsPodTopResponse"
Expand Down
4 changes: 2 additions & 2 deletions test/apiv2/40-pods.at
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ t GET libpod/pods/fakename/top 404 \
.cause="no such pod"

t GET libpod/pods/foo/top 200 \
.Processes[0][-1]="/pause " \
.Processes[0][-1]="/pause" \
.Titles[-1]="COMMAND"

t GET libpod/pods/foo/top?ps_args=args,pid 200 \
.Processes[0][0]="/pause " \
.Processes[0][0]="/pause" \
.Processes[0][1]="1" \
.Titles[0]="COMMAND" \
.Titles[1]="PID" \
Expand Down
Loading

0 comments on commit 449cc7a

Please sign in to comment.