Skip to content

Commit

Permalink
track podman events in kube play
Browse files Browse the repository at this point in the history
Signed-off-by: Gunjan Vyas <[email protected]>
  • Loading branch information
vyasgun committed May 31, 2023
1 parent b7d4da6 commit 41d05b9
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 3 deletions.
2 changes: 2 additions & 0 deletions cmd/podman/kube/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func playFlags(cmd *cobra.Command) {
flags.BoolVar(&playOptions.TLSVerifyCLI, "tls-verify", true, "Require HTTPS and verify certificates when contacting registries")
flags.BoolVar(&playOptions.StartCLI, "start", true, "Start the pod after creating it")
flags.BoolVar(&playOptions.Force, "force", false, "Remove volumes as part of --down")
flags.BoolVar(&playOptions.PrintProgress, "print-progress", false, "Print progress")
flags.MarkHidden("print-progress")

authfileFlagName := "authfile"
flags.StringVar(&playOptions.Authfile, authfileFlagName, auth.GetDefaultAuthFile(), "Path of the authentication file. Use REGISTRY_AUTH_FILE environment variable to override")
Expand Down
10 changes: 10 additions & 0 deletions libpod/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type Runtime struct {
noStore bool
// secretsManager manages secrets
secretsManager *secrets.SecretsManager

eventListener func(chan *events.Event, chan error, string)
}

// SetXdgDirs ensures the XDG_RUNTIME_DIR env and XDG_CONFIG_HOME variables are set.
Expand Down Expand Up @@ -1188,3 +1190,11 @@ func (r *Runtime) RemoteURI() string {
func (r *Runtime) SetRemoteURI(uri string) {
r.config.Engine.RemoteURI = uri
}

func (r *Runtime) SetEventListener(listener func(chan *events.Event, chan error, string)) {
r.eventListener = listener
}

func (r *Runtime) GetEventListener() func(chan *events.Event, chan error, string) {
return r.eventListener
}
50 changes: 50 additions & 0 deletions pkg/api/handlers/libpod/kube.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package libpod

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"time"

"github.com/containers/image/v5/types"
"github.com/containers/podman/v4/libpod"
"github.com/containers/podman/v4/libpod/events"
"github.com/containers/podman/v4/pkg/api/handlers/utils"
api "github.com/containers/podman/v4/pkg/api/types"
"github.com/containers/podman/v4/pkg/auth"
Expand All @@ -31,6 +35,7 @@ func KubePlay(w http.ResponseWriter, r *http.Request) {
PublishPorts []string `schema:"publishPorts"`
Wait bool `schema:"wait"`
ServiceContainer bool `schema:"serviceContainer"`
PrintProgress bool `schema:"printProgress`
}{
TLSVerify: true,
Start: true,
Expand Down Expand Up @@ -100,14 +105,59 @@ func KubePlay(w http.ResponseWriter, r *http.Request) {
PublishPorts: query.PublishPorts,
Wait: query.Wait,
ServiceContainer: query.ServiceContainer,
PrintProgress: query.PrintProgress,
}
if _, found := r.URL.Query()["tlsVerify"]; found {
options.SkipTLSVerify = types.NewOptionalBool(!query.TLSVerify)
}
if _, found := r.URL.Query()["start"]; found {
options.Start = types.NewOptionalBool(query.Start)
}

var ctx context.Context
var cancel context.CancelFunc
if query.PrintProgress {
ctx, cancel = context.WithCancel(r.Context())
containerEngine.Libpod.SetEventListener(func(eventChan chan *events.Event, errChan chan error, filter string) {

go func() {
errChan <- containerEngine.Events(r.Context(), entities.EventsOptions{EventChan: eventChan, Filter: []string{filter}})
}()
go func() {
flush := func() {
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
enc := json.NewEncoder(w)
enc.SetEscapeHTML(true)
for {
select {
case <-ctx.Done():
return
default:
select {
case event, ok := <-eventChan:
if !ok {
return
}
enc.Encode(entities.KubePlayReport{Stream: event.ToHumanReadable(true)})
flush()
time.Sleep(time.Second)
case <-errChan:
return
default:
// non-blocking call
}
}
}
}()
})
}
report, err := containerEngine.PlayKube(r.Context(), r.Body, options)
if query.PrintProgress {
cancel()
}
if err != nil {
utils.Error(w, http.StatusInternalServerError, fmt.Errorf("playing YAML file: %w", err))
return
Expand Down
25 changes: 23 additions & 2 deletions pkg/bindings/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package kube
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -81,8 +84,26 @@ func PlayWithBody(ctx context.Context, body io.Reader, options *PlayOptions) (*e
}
defer response.Body.Close()

if err := response.Process(&report); err != nil {
return nil, err
dec := json.NewDecoder(response.Body.(io.Reader))

for {

var s entities.KubePlayReport
if err := dec.Decode(&s); err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) {
return nil, fmt.Errorf("server probably quit: %w", err)
}

if errors.Is(err, io.EOF) {
break
}
return &entities.KubePlayReport{}, fmt.Errorf("decoding stream: %w", err)
}

report = s
if s.Stream != "" {
fmt.Fprintln(os.Stdout, s.Stream)
}
}

return &report, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/bindings/kube/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type PlayOptions struct {
// // Wait - indicates whether to return after having created the pods
Wait *bool
ServiceContainer *bool
PrintProgress *bool
}

// ApplyOptions are optional options for applying kube YAML files to a k8s cluster
Expand Down
15 changes: 15 additions & 0 deletions pkg/bindings/kube/types_play_options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/domain/entities/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type PlayKubeOptions struct {
// PublishPorts - configure how to expose ports configured inside the K8S YAML file
PublishPorts []string
// Wait - indicates whether to return after having created the pods
Wait bool
Wait bool
PrintProgress bool
}

// PlayKubePod represents a single pod and associated containers created by play kube
Expand Down Expand Up @@ -105,6 +106,7 @@ type PlayKubeReport struct {
ServiceContainerID string
// If set, exit with the specified exit code.
ExitCode *int32
Stream string
}

type KubePlayReport = PlayKubeReport
Expand Down
47 changes: 47 additions & 0 deletions pkg/domain/infra/abi/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"strings"
"sync"

"github.com/containers/podman/v4/libpod/events"
metav1 "github.com/containers/podman/v4/pkg/k8s.io/apimachinery/pkg/apis/meta/v1"

buildahDefine "github.com/containers/buildah/define"
"github.com/containers/common/libimage"
nettypes "github.com/containers/common/libnetwork/types"
Expand Down Expand Up @@ -186,6 +189,37 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
}
}()

var eventFilters []string
for _, document := range documentList {
kind, err := getKubeKind(document)
if err != nil {
return nil, fmt.Errorf("unable to read kube YAML: %w", err)
}
name, err := getKubeName(document)
if err != nil {
return nil, fmt.Errorf("unable to read kube YAML: %w", err)
}
switch strings.ToLower(kind) {
case "pod":
kind = "pod"
case "deployment":
kind = "pod"
name = fmt.Sprintf("%s-pod", name)
case "persistentvolumeclaim", "configmap", "secret":
kind = "volume"
}
eventFilters = append(eventFilters, fmt.Sprintf("%s=%s", kind, name))
}

// Start an event listener for each filter
if listener := ic.Libpod.GetEventListener(); listener != nil {
for _, filter := range eventFilters {
errChan := make(chan error)
eventChan := make(chan *events.Event)
go listener(eventChan, errChan, filter)
}
}

// create pod on each document if it is a pod or deployment
// any other kube kind will be skipped
for _, document := range documentList {
Expand Down Expand Up @@ -1237,6 +1271,19 @@ func getKubeKind(obj []byte) (string, error) {
return kubeObject.Kind, nil
}

// getKubeName unmarshalls a kube YAML document and returns its name.
func getKubeName(obj []byte) (string, error) {
var kubeObject struct {
metav1.ObjectMeta `json:"metadata"`
}

if err := yaml.Unmarshal(obj, &kubeObject); err != nil {
return "", err
}

return kubeObject.Name, nil
}

// sortKubeKinds adds the correct creation order for the kube kinds.
// Any pod dependency will be created first like volumes, secrets, etc.
func sortKubeKinds(documentList [][]byte) ([][]byte, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/infra/tunnel/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, opts en
options.WithStart(start == types.OptionalBoolTrue)
}
options.WithPublishPorts(opts.PublishPorts)
options.WithPrintProgress(opts.PrintProgress)
return play.KubeWithBody(ic.ClientCtx, body, options)
}

Expand Down

0 comments on commit 41d05b9

Please sign in to comment.