Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

track podman events in kube play #2

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/podman/kube/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ func playFlags(cmd *cobra.Command) {
flags.BoolVar(&playOptions.TLSVerifyCLI, "tls-verify", true, "Require HTTPS and verify certificates when contacting registries")
flags.BoolVar(&playOptions.StartCLI, "start", true, "Start the pod after creating it")
flags.BoolVar(&playOptions.Force, "force", false, "Remove volumes as part of --down")
flags.BoolVar(&playOptions.PrintProgress, "print-progress", false, "Print progress")
flags.MarkHidden("print-progress")

authfileFlagName := "authfile"
flags.StringVar(&playOptions.Authfile, authfileFlagName, auth.GetDefaultAuthFile(), "Path of the authentication file. Use REGISTRY_AUTH_FILE environment variable to override")
Expand Down
10 changes: 10 additions & 0 deletions libpod/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type Runtime struct {
noStore bool
// secretsManager manages secrets
secretsManager *secrets.SecretsManager

eventListener func(chan *events.Event, chan error, string)
}

// SetXdgDirs ensures the XDG_RUNTIME_DIR env and XDG_CONFIG_HOME variables are set.
Expand Down Expand Up @@ -1188,3 +1190,11 @@ func (r *Runtime) RemoteURI() string {
func (r *Runtime) SetRemoteURI(uri string) {
r.config.Engine.RemoteURI = uri
}

func (r *Runtime) SetEventListener(listener func(chan *events.Event, chan error, string)) {
r.eventListener = listener
}

func (r *Runtime) GetEventListener() func(chan *events.Event, chan error, string) {
return r.eventListener
}
50 changes: 50 additions & 0 deletions pkg/api/handlers/libpod/kube.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package libpod

import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"time"

"github.com/containers/image/v5/types"
"github.com/containers/podman/v4/libpod"
"github.com/containers/podman/v4/libpod/events"
"github.com/containers/podman/v4/pkg/api/handlers/utils"
api "github.com/containers/podman/v4/pkg/api/types"
"github.com/containers/podman/v4/pkg/auth"
Expand All @@ -31,6 +35,7 @@ func KubePlay(w http.ResponseWriter, r *http.Request) {
PublishPorts []string `schema:"publishPorts"`
Wait bool `schema:"wait"`
ServiceContainer bool `schema:"serviceContainer"`
PrintProgress bool `schema:"printProgress`
}{
TLSVerify: true,
Start: true,
Expand Down Expand Up @@ -100,14 +105,59 @@ func KubePlay(w http.ResponseWriter, r *http.Request) {
PublishPorts: query.PublishPorts,
Wait: query.Wait,
ServiceContainer: query.ServiceContainer,
PrintProgress: query.PrintProgress,
}
if _, found := r.URL.Query()["tlsVerify"]; found {
options.SkipTLSVerify = types.NewOptionalBool(!query.TLSVerify)
}
if _, found := r.URL.Query()["start"]; found {
options.Start = types.NewOptionalBool(query.Start)
}

var ctx context.Context
var cancel context.CancelFunc
if query.PrintProgress {
ctx, cancel = context.WithCancel(r.Context())
containerEngine.Libpod.SetEventListener(func(eventChan chan *events.Event, errChan chan error, filter string) {

go func() {
errChan <- containerEngine.Events(r.Context(), entities.EventsOptions{EventChan: eventChan, Filter: []string{filter}})
}()
go func() {
flush := func() {
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
enc := json.NewEncoder(w)
enc.SetEscapeHTML(true)
for {
select {
case <-ctx.Done():
return
default:
select {
case event, ok := <-eventChan:
if !ok {
return
}
enc.Encode(entities.KubePlayReport{Stream: event.ToHumanReadable(true)})
flush()
time.Sleep(time.Second)
case <-errChan:
return
default:
// non-blocking call
}
}
}
}()
})
}
report, err := containerEngine.PlayKube(r.Context(), r.Body, options)
if query.PrintProgress {
cancel()
}
if err != nil {
utils.Error(w, http.StatusInternalServerError, fmt.Errorf("playing YAML file: %w", err))
return
Expand Down
25 changes: 23 additions & 2 deletions pkg/bindings/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package kube
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -81,8 +84,26 @@ func PlayWithBody(ctx context.Context, body io.Reader, options *PlayOptions) (*e
}
defer response.Body.Close()

if err := response.Process(&report); err != nil {
return nil, err
dec := json.NewDecoder(response.Body.(io.Reader))

for {

var s entities.KubePlayReport
if err := dec.Decode(&s); err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) {
return nil, fmt.Errorf("server probably quit: %w", err)
}

if errors.Is(err, io.EOF) {
break
}
return &entities.KubePlayReport{}, fmt.Errorf("decoding stream: %w", err)
}

report = s
if s.Stream != "" {
fmt.Fprintln(os.Stdout, s.Stream)
}
}

return &report, nil
Expand Down
1 change: 1 addition & 0 deletions pkg/bindings/kube/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type PlayOptions struct {
// // Wait - indicates whether to return after having created the pods
Wait *bool
ServiceContainer *bool
PrintProgress *bool
}

// ApplyOptions are optional options for applying kube YAML files to a k8s cluster
Expand Down
15 changes: 15 additions & 0 deletions pkg/bindings/kube/types_play_options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/domain/entities/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ type PlayKubeOptions struct {
// PublishPorts - configure how to expose ports configured inside the K8S YAML file
PublishPorts []string
// Wait - indicates whether to return after having created the pods
Wait bool
Wait bool
PrintProgress bool
}

// PlayKubePod represents a single pod and associated containers created by play kube
Expand Down Expand Up @@ -105,6 +106,7 @@ type PlayKubeReport struct {
ServiceContainerID string
// If set, exit with the specified exit code.
ExitCode *int32
Stream string
}

type KubePlayReport = PlayKubeReport
Expand Down
47 changes: 47 additions & 0 deletions pkg/domain/infra/abi/play.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"strings"
"sync"

"github.com/containers/podman/v4/libpod/events"
metav1 "github.com/containers/podman/v4/pkg/k8s.io/apimachinery/pkg/apis/meta/v1"

buildahDefine "github.com/containers/buildah/define"
"github.com/containers/common/libimage"
nettypes "github.com/containers/common/libnetwork/types"
Expand Down Expand Up @@ -186,6 +189,37 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, options
}
}()

var eventFilters []string
for _, document := range documentList {
kind, err := getKubeKind(document)
if err != nil {
return nil, fmt.Errorf("unable to read kube YAML: %w", err)
}
name, err := getKubeName(document)
if err != nil {
return nil, fmt.Errorf("unable to read kube YAML: %w", err)
}
switch strings.ToLower(kind) {
case "pod":
kind = "pod"
case "deployment":
kind = "pod"
name = fmt.Sprintf("%s-pod", name)
case "persistentvolumeclaim", "configmap", "secret":
kind = "volume"
}
eventFilters = append(eventFilters, fmt.Sprintf("%s=%s", kind, name))
}

// Start an event listener for each filter
if listener := ic.Libpod.GetEventListener(); listener != nil {
for _, filter := range eventFilters {
errChan := make(chan error)
eventChan := make(chan *events.Event)
go listener(eventChan, errChan, filter)
}
}

// create pod on each document if it is a pod or deployment
// any other kube kind will be skipped
for _, document := range documentList {
Expand Down Expand Up @@ -1237,6 +1271,19 @@ func getKubeKind(obj []byte) (string, error) {
return kubeObject.Kind, nil
}

// getKubeName unmarshalls a kube YAML document and returns its name.
func getKubeName(obj []byte) (string, error) {
var kubeObject struct {
metav1.ObjectMeta `json:"metadata"`
}

if err := yaml.Unmarshal(obj, &kubeObject); err != nil {
return "", err
}

return kubeObject.Name, nil
}

// sortKubeKinds adds the correct creation order for the kube kinds.
// Any pod dependency will be created first like volumes, secrets, etc.
func sortKubeKinds(documentList [][]byte) ([][]byte, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/infra/tunnel/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (ic *ContainerEngine) PlayKube(ctx context.Context, body io.Reader, opts en
options.WithStart(start == types.OptionalBoolTrue)
}
options.WithPublishPorts(opts.PublishPorts)
options.WithPrintProgress(opts.PrintProgress)
return play.KubeWithBody(ic.ClientCtx, body, options)
}

Expand Down