Skip to content

Commit

Permalink
event backend journald: fix problem with empty journal
Browse files Browse the repository at this point in the history
Currently podman events will just fail with `Error: failed to get journal
cursor: failed to get cursor: cannot assign requested address` when the
journal contains zero podman events.

The problem is that we are using the journal accessors wrong. There is no
need to call GetCursor() and compare them manually. The Next() return an
integer which tells if it moved to the next or not. This means the we can
remove GetCursor() which would fail when there is no entry.

This also includes another bug fix. Previously the logic called Next()
twice for the first entry which caused us to miss the first entry.

To reproduce this issue you can run the following commands:
```
sudo journalctl --rotate
sudo journalctl --vacuum-time=1s
```
Note that this will delete the full journal.

Now run podman events and it fails but with this patch it works.
Now generate a single event, i.e. podman pull alpine, and run
podman events --until 1s.

I am not sure how to get a reliable test into CI, I really do not want
to delete the journal and developer or CI systems.

Fixes second part of containers#15688

Signed-off-by: Paul Holzinger <[email protected]>
  • Loading branch information
Luap99 committed Sep 12, 2022
1 parent 138b09c commit 76980a2
Showing 1 changed file with 53 additions and 47 deletions.
100 changes: 53 additions & 47 deletions libpod/events/journal_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,57 +112,16 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
}
}

// the api requires a next|prev before getting a cursor
if _, err := j.Next(); err != nil {
return fmt.Errorf("failed to move journal cursor to next entry: %w", err)
}

prevCursor, err := j.GetCursor()
if err != nil {
return fmt.Errorf("failed to get journal cursor: %w", err)
}
for {
select {
case <-ctx.Done():
// the consumer has cancelled
return nil
default:
// fallthrough
}

if _, err := j.Next(); err != nil {
return fmt.Errorf("failed to move journal cursor to next entry: %w", err)
}
newCursor, err := j.GetCursor()
entry, err := getNextEntry(ctx, j, options.Stream, untilTime)
if err != nil {
return fmt.Errorf("failed to get journal cursor: %w", err)
return err
}
if prevCursor == newCursor {
if !options.Stream || (len(options.Until) > 0 && time.Now().After(untilTime)) {
break
}

// j.Wait() is blocking, this would cause the goroutine to hang forever
// if no more journal entries are generated and thus if the client
// has closed the connection in the meantime to leak memory.
// Waiting only 5 seconds makes sure we can check if the client closed in the
// meantime at least every 5 seconds.
t := 5 * time.Second
if len(options.Until) > 0 {
until := time.Until(untilTime)
if until < t {
t = until
}
}
_ = j.Wait(t)
continue
// no entry == we hit the end
if entry == nil {
return nil
}
prevCursor = newCursor

entry, err := j.GetEntry()
if err != nil {
return fmt.Errorf("failed to read journal entry: %w", err)
}
newEvent, err := newEventFromJournalEntry(entry)
if err != nil {
// We can't decode this event.
Expand All @@ -177,7 +136,6 @@ func (e EventJournalD) Read(ctx context.Context, options ReadOptions) error {
options.EventChannel <- newEvent
}
}
return nil
}

func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {
Expand Down Expand Up @@ -238,3 +196,51 @@ func newEventFromJournalEntry(entry *sdjournal.JournalEntry) (*Event, error) {
func (e EventJournalD) String() string {
return Journald.String()
}

// getNextEntry returns the next entry in the journal. If the end of the
// journal is reached and stream is not set or the current time is after
// the until time this function return nil,nil.
func getNextEntry(ctx context.Context, j *sdjournal.Journal, stream bool, untilTime time.Time) (*sdjournal.JournalEntry, error) {
for {
select {
case <-ctx.Done():
// the consumer has cancelled
return nil, nil
default:
// fallthrough
}
// the api requires a next|prev before reading the event
ret, err := j.Next()
if err != nil {
return nil, fmt.Errorf("failed to move journal cursor to next entry: %w", err)
}
// ret == 0 equals EOF, see sd_journal_next(3)
if ret == 0 {
if !stream || (!untilTime.IsZero() && time.Now().After(untilTime)) {
// we hit the end and should not keep streaming
return nil, nil
}
// keep waiting for the next entry
// j.Wait() is blocking, this would cause the goroutine to hang forever
// if no more journal entries are generated and thus if the client
// has closed the connection in the meantime to leak memory.
// Waiting only 5 seconds makes sure we can check if the client closed in the
// meantime at least every 5 seconds.
t := 5 * time.Second
if !untilTime.IsZero() {
until := time.Until(untilTime)
if until < t {
t = until
}
}
_ = j.Wait(t)
continue
}

entry, err := j.GetEntry()
if err != nil {
return nil, fmt.Errorf("failed to read journal entry: %w", err)
}
return entry, nil
}
}

0 comments on commit 76980a2

Please sign in to comment.