Skip to content

Commit

Permalink
Merge pull request containers#24406 from Luap99/event-api-response
Browse files Browse the repository at this point in the history
fix API issue about missing the status code in the events and logs endpoints
  • Loading branch information
openshift-merge-bot[bot] authored Nov 4, 2024
2 parents 2da164c + e6d9878 commit 0f25d9e
Show file tree
Hide file tree
Showing 18 changed files with 213 additions and 284 deletions.
57 changes: 24 additions & 33 deletions cmd/podman/system/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/containers/podman/v5/cmd/podman/validate"
"github.com/containers/podman/v5/libpod/events"
"github.com/containers/podman/v5/pkg/domain/entities"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -139,9 +140,8 @@ func eventsCmd(cmd *cobra.Command, _ []string) error {
if len(eventOptions.Since) > 0 || len(eventOptions.Until) > 0 {
eventOptions.FromStart = true
}
eventChannel := make(chan *events.Event, 1)
eventChannel := make(chan events.ReadResult, 1)
eventOptions.EventChan = eventChannel
errChannel := make(chan error)

var (
rpt *report.Formatter
Expand All @@ -161,40 +161,31 @@ func eventsCmd(cmd *cobra.Command, _ []string) error {
}
}

go func() {
errChannel <- registry.ContainerEngine().Events(context.Background(), eventOptions)
close(errChannel)
}()

for {
select {
case event, ok := <-eventChannel:
if !ok {
// channel was closed we can exit
// read the error channel blocking to make sure we are not missing any errors (#23165)
return <-errChannel
}
switch {
case doJSON:
e := newEventFromLibpodEvent(event)
jsonStr, err := e.ToJSONString()
if err != nil {
return err
}
fmt.Println(jsonStr)
case cmd.Flags().Changed("format"):
if err := rpt.Execute(newEventFromLibpodEvent(event)); err != nil {
return err
}
default:
fmt.Println(event.ToHumanReadable(!noTrunc))
}
case err := <-errChannel:
// only exit in case of an error,
// otherwise keep reading events until the event channel is closed
err := registry.ContainerEngine().Events(context.Background(), eventOptions)
if err != nil {
return err
}

for evt := range eventChannel {
if evt.Error != nil {
logrus.Errorf("Failed to read event: %v", evt.Error)
continue
}
switch {
case doJSON:
e := newEventFromLibpodEvent(evt.Event)
jsonStr, err := e.ToJSONString()
if err != nil {
return err
}
fmt.Println(jsonStr)
case cmd.Flags().Changed("format"):
if err := rpt.Execute(newEventFromLibpodEvent(evt.Event)); err != nil {
return err
}
default:
fmt.Println(evt.Event.ToHumanReadable(!noTrunc))
}
}
return nil
}
17 changes: 3 additions & 14 deletions libpod/container_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"time"

"github.com/containers/podman/v5/libpod/define"
"github.com/containers/podman/v5/libpod/events"
"github.com/containers/podman/v5/libpod/logs"
systemdDefine "github.com/containers/podman/v5/pkg/systemd/define"
"github.com/nxadm/tail"
Expand Down Expand Up @@ -139,20 +138,10 @@ func (c *Container) readFromLogFile(ctx context.Context, options *logs.LogOption

// The container is running, so we need to wait until the container exited
go func() {
eventChannel := make(chan *events.Event)
eventOptions := events.ReadOptions{
EventChannel: eventChannel,
Filters: []string{"event=died", "container=" + c.ID()},
Stream: true,
_, err = c.Wait(ctx)
if err != nil && !errors.Is(err, define.ErrNoSuchCtr) {
logrus.Errorf("Waiting for container to exit: %v", err)
}
go func() {
if err := c.runtime.Events(ctx, eventOptions); err != nil {
logrus.Errorf("Waiting for container to exit: %v", err)
}
}()
// Now wait for the died event and signal to finish
// reading the log until EOF.
<-eventChannel
// Make sure to wait at least for the poll duration
// before stopping the file logger (see #10675).
time.Sleep(watch.POLL_DURATION)
Expand Down
49 changes: 12 additions & 37 deletions libpod/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"context"
"fmt"
"path/filepath"
"sync"

"github.com/containers/podman/v5/libpod/define"
"github.com/containers/podman/v5/libpod/events"
Expand Down Expand Up @@ -187,53 +186,29 @@ func (r *Runtime) Events(ctx context.Context, options events.ReadOptions) error

// GetEvents reads the event log and returns events based on input filters
func (r *Runtime) GetEvents(ctx context.Context, filters []string) ([]*events.Event, error) {
eventChannel := make(chan *events.Event)
eventChannel := make(chan events.ReadResult)
options := events.ReadOptions{
EventChannel: eventChannel,
Filters: filters,
FromStart: true,
Stream: false,
}

logEvents := make([]*events.Event, 0, len(eventChannel))
readLock := sync.Mutex{}
readLock.Lock()
go func() {
for e := range eventChannel {
logEvents = append(logEvents, e)
}
readLock.Unlock()
}()

readErr := r.eventer.Read(ctx, options)
readLock.Lock() // Wait for the events to be consumed.
return logEvents, readErr
}

// GetLastContainerEvent takes a container name or ID and an event status and returns
// the last occurrence of the container event
func (r *Runtime) GetLastContainerEvent(ctx context.Context, nameOrID string, containerEvent events.Status) (*events.Event, error) {
// FIXME: events should be read in reverse order!
// https://github.com/containers/podman/issues/14579

// check to make sure the event.Status is valid
if _, err := events.StringToStatus(containerEvent.String()); err != nil {
return nil, err
}
filters := []string{
fmt.Sprintf("container=%s", nameOrID),
fmt.Sprintf("event=%s", containerEvent),
"type=container",
}
containerEvents, err := r.GetEvents(ctx, filters)
err := r.eventer.Read(ctx, options)
if err != nil {
return nil, err
}
if len(containerEvents) < 1 {
return nil, fmt.Errorf("%s not found: %w", containerEvent.String(), events.ErrEventNotFound)

logEvents := make([]*events.Event, 0, len(eventChannel))
for evt := range eventChannel {
// we ignore any error here, this is only used on the backup
// GetExecDiedEvent() died path as best effort anyway
if evt.Error == nil {
logEvents = append(logEvents, evt.Event)
}
}
// return the last element in the slice
return containerEvents[len(containerEvents)-1], nil

return logEvents, nil
}

// GetExecDiedEvent takes a container name or ID, exec session ID, and returns
Expand Down
9 changes: 6 additions & 3 deletions libpod/events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ const (
Journald EventerType = iota
// Null is a no-op events logger. It does not read or write events.
Null EventerType = iota
// Memory indicates the event logger will hold events in memory
Memory EventerType = iota
)

// Event describes the attributes of a libpod event
Expand Down Expand Up @@ -87,10 +85,15 @@ type Eventer interface {
String() string
}

type ReadResult struct {
Event *Event
Error error
}

// ReadOptions describe the attributes needed to read event logs
type ReadOptions struct {
// EventChannel is the comm path back to user
EventChannel chan *Event
EventChannel chan ReadResult
// Filters are key/value pairs that describe to limit output
Filters []string
// FromStart means you start reading from the start of the logs
Expand Down
4 changes: 0 additions & 4 deletions libpod/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ func (et EventerType) String() string {
return "file"
case Journald:
return "journald"
case Memory:
return "memory"
case Null:
return "none"
default:
Expand All @@ -36,8 +34,6 @@ func IsValidEventer(eventer string) bool {
return true
case Journald.String():
return true
case Memory.String():
return true
case Null.String():
return true
default:
Expand Down
2 changes: 0 additions & 2 deletions libpod/events/events_freebsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ func NewEventer(options EventerOptions) (Eventer, error) {
return EventLogFile{options}, nil
case strings.ToUpper(Null.String()):
return newNullEventer(), nil
case strings.ToUpper(Memory.String()):
return NewMemoryEventer(), nil
default:
return nil, fmt.Errorf("unknown event logger type: %s", strings.ToUpper(options.EventerType))
}
Expand Down
2 changes: 0 additions & 2 deletions libpod/events/events_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ func NewEventer(options EventerOptions) (Eventer, error) {
return newLogFileEventer(options)
case strings.ToUpper(Null.String()):
return newNullEventer(), nil
case strings.ToUpper(Memory.String()):
return NewMemoryEventer(), nil
default:
return nil, fmt.Errorf("unknown event logger type: %s", strings.ToUpper(options.EventerType))
}
Expand Down
66 changes: 40 additions & 26 deletions libpod/events/journal_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"runtime"
"strconv"
"time"

Expand Down Expand Up @@ -97,8 +98,9 @@ func (e EventJournalD) Write(ee Event) error {
}

// Read reads events from the journal and sends qualified events to the event channel
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
defer close(options.EventChannel)
func (e EventJournalD) Read(ctx context.Context, options ReadOptions) (retErr error) {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
filterMap, err := generateEventFilters(options.Filters, options.Since, options.Until)
if err != nil {
return fmt.Errorf("failed to parse event filters: %w", err)
Expand All @@ -117,13 +119,15 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
return err
}
defer func() {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
if retErr != nil {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
}
}
}()
err = j.SetDataThreshold(0)
if err != nil {
logrus.Warnf("cannot set data threshold: %v", err)
return fmt.Errorf("cannot set data threshold for journal: %v", err)
}
// match only podman journal entries
podmanJournal := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
Expand Down Expand Up @@ -158,30 +162,40 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
}
}

for {
entry, err := GetNextEntry(ctx, j, options.Stream, untilTime)
if err != nil {
return err
}
// no entry == we hit the end
if entry == nil {
return nil
}
go func() {
defer close(options.EventChannel)
defer func() {
if err := j.Close(); err != nil {
logrus.Errorf("Unable to close journal :%v", err)
}
}()
for {
entry, err := GetNextEntry(ctx, j, options.Stream, untilTime)
if err != nil {
options.EventChannel <- ReadResult{Error: err}
break
}
// no entry == we hit the end
if entry == nil {
break
}

newEvent, err := newEventFromJournalEntry(entry)
if err != nil {
// We can't decode this event.
// Don't fail hard - that would make events unusable.
// Instead, log and continue.
if !errors.Is(err, ErrEventTypeBlank) {
logrus.Errorf("Unable to decode event: %v", err)
newEvent, err := newEventFromJournalEntry(entry)
if err != nil {
// We can't decode this event.
// Don't fail hard - that would make events unusable.
// Instead, log and continue.
if !errors.Is(err, ErrEventTypeBlank) {
options.EventChannel <- ReadResult{Error: fmt.Errorf("unable to decode event: %v", err)}
}
continue
}
if applyFilters(newEvent, filterMap) {
options.EventChannel <- ReadResult{Event: newEvent}
}
continue
}
if applyFilters(newEvent, filterMap) {
options.EventChannel <- newEvent
}
}
}()
return nil
}

func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {
Expand Down
Loading

0 comments on commit 0f25d9e

Please sign in to comment.