Skip to content

Commit

Permalink
Merge pull request #10431 from vrothberg/journald-logs
Browse files Browse the repository at this point in the history
journald logger: fix race condition
  • Loading branch information
openshift-merge-robot authored May 26, 2021
2 parents ac94be3 + 10569c9 commit 5b4ffc7
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 117 deletions.
276 changes: 162 additions & 114 deletions libpod/container_log_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ package libpod
import (
"context"
"fmt"
"io"
"math"
"strings"
"time"

"github.com/containers/podman/v3/libpod/define"
"github.com/containers/podman/v3/libpod/events"
"github.com/containers/podman/v3/libpod/logs"
journal "github.com/coreos/go-systemd/v22/sdjournal"
"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand All @@ -24,122 +22,187 @@ const (

// journaldLogErr is the journald priority signifying stderr
journaldLogErr = "3"

// bufLen is the length of the buffer to read from a k8s-file
// formatted log line
// let's set it as 2k just to be safe if k8s-file format ever changes
bufLen = 16384
)

func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
var config journal.JournalReaderConfig
if options.Tail < 0 {
config.NumFromTail = 0
} else if options.Tail == 0 {
config.NumFromTail = math.MaxUint64
} else {
config.NumFromTail = uint64(options.Tail)
journal, err := sdjournal.NewJournal()
if err != nil {
return err
}
if options.Multi {
config.Formatter = journalFormatterWithID
} else {
config.Formatter = journalFormatter
}
defaultTime := time.Time{}
if options.Since != defaultTime {
// coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
// return nothing instead of falsely printing
if time.Now().Before(options.Since) {
return nil
}
// coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past
config.Since = -time.Since(options.Since)
// While logs are written to the `logChannel`, we inspect each event
// and stop once the container has died. Having logs and events in one
// stream prevents a race condition that we faced in #10323.

// Add the filters for events.
match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
config.Matches = append(config.Matches, journal.Match{
Field: "CONTAINER_ID_FULL",
Value: c.ID(),
})
options.WaitGroup.Add(1)

r, err := journal.NewJournalReader(config)
if err != nil {
// Add the filter for logs. Note the disjunction so that we match
// either the events or the logs.
if err := journal.AddDisjunction(); err != nil {
return errors.Wrap(err, "adding filter disjunction to journald logger")
}
match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}

if err := journal.SeekHead(); err != nil {
return err
}
if r == nil {
return errors.Errorf("journal reader creation failed")
// API requires Next() immediately after SeekHead().
if _, err := journal.Next(); err != nil {
return errors.Wrap(err, "initial journal cursor")
}
if options.Tail == math.MaxInt64 {
r.Rewind()

// API requires a next|prev before getting a cursor.
if _, err := journal.Previous(); err != nil {
return errors.Wrap(err, "initial journal cursor")
}
state, err := c.State()
if err != nil {
return err

// Note that the initial cursor may not yet be ready, so we'll do an
// exponential backoff.
var cursor string
var cursorError error
for i := 1; i <= 3; i++ {
cursor, cursorError = journal.GetCursor()
if err != nil {
continue
}
time.Sleep(time.Duration(i*100) * time.Millisecond)
break
}
if cursorError != nil {
return errors.Wrap(cursorError, "inital journal cursor")
}

// We need the container's events in the same journal to guarantee
// consistency, see #10323.
if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" {
return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger)
}

if options.Follow && state == define.ContainerStateRunning {
go func() {
done := make(chan bool)
until := make(chan time.Time)
go func() {
select {
case <-ctx.Done():
until <- time.Time{}
case <-done:
// nothing to do anymore
options.WaitGroup.Add(1)
go func() {
defer func() {
options.WaitGroup.Done()
if err := journal.Close(); err != nil {
logrus.Errorf("Unable to close journal: %v", err)
}
}()

afterTimeStamp := false // needed for options.Since
tailQueue := []*logs.LogLine{} // needed for options.Tail
doTail := options.Tail > 0
for {
select {
case <-ctx.Done():
// Remote client may have closed/lost the connection.
return
default:
// Fallthrough
}

if _, err := journal.Next(); err != nil {
logrus.Errorf("Failed to move journal cursor to next entry: %v", err)
return
}
latestCursor, err := journal.GetCursor()
if err != nil {
logrus.Errorf("Failed to get journal cursor: %v", err)
return
}

// Hit the end of the journal.
if cursor == latestCursor {
if doTail {
// Flush *once* we hit the end of the journal.
startIndex := int64(len(tailQueue)-1) - options.Tail
if startIndex < 0 {
startIndex = 0
}
for i := startIndex; i < int64(len(tailQueue)); i++ {
logChannel <- tailQueue[i]
}
tailQueue = nil
doTail = false
}
// Unless we follow, quit.
if !options.Follow {
return
}
}()
go func() {
// FIXME (#10323): we are facing a terrible
// race condition here. At the time the
// container dies and `c.Wait()` has returned,
// we may not have received all journald logs.
// So far there is no other way than waiting
// for a second. Ultimately, `r.Follow` is
// racy and we may have to implement our custom
// logic here.
c.Wait(ctx)
time.Sleep(time.Second)
until <- time.Time{}
}()
follower := journaldFollowBuffer{logChannel, options.Multi}
err := r.Follow(until, follower)
// Sleep until something's happening on the journal.
journal.Wait(sdjournal.IndefiniteWait)
continue
}
cursor = latestCursor

entry, err := journal.GetEntry()
if err != nil {
logrus.Debugf(err.Error())
logrus.Errorf("Failed to get journal entry: %v", err)
return
}
r.Close()
options.WaitGroup.Done()
done <- true
return
}()
return nil
}

go func() {
bytes := make([]byte, bufLen)
// /me complains about no do-while in go
ec, err := r.Read(bytes)
for ec != 0 && err == nil {
// because we are reusing bytes, we need to make
// sure the old data doesn't get into the new line
bytestr := string(bytes[:ec])
logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi)
if err2 != nil {
logrus.Error(err2)
if !afterTimeStamp {
entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
if entryTime.Before(options.Since) {
continue
}
afterTimeStamp = true
}

// If we're reading an event and the container exited/died,
// then we're done and can return.
event, ok := entry.Fields["PODMAN_EVENT"]
if ok {
status, err := events.StringToStatus(event)
if err != nil {
logrus.Errorf("Failed to translate event: %v", err)
return
}
if status == events.Exited {
return
}
continue
}

var message string
var formatError error

if options.Multi {
message, formatError = journalFormatterWithID(entry)
} else {
message, formatError = journalFormatter(entry)
}

if formatError != nil {
logrus.Errorf("Failed to parse journald log entry: %v", err)
return
}

logLine, err := logs.NewJournaldLogLine(message, options.Multi)
if err != nil {
logrus.Errorf("Failed parse log line: %v", err)
return
}
if doTail {
tailQueue = append(tailQueue, logLine)
continue
}
logChannel <- logLine
ec, err = r.Read(bytes)
}
if err != nil && err != io.EOF {
logrus.Error(err)
}
r.Close()
options.WaitGroup.Done()
}()

return nil
}

func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
Expand All @@ -162,7 +225,7 @@ func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func journalFormatter(entry *journal.JournalEntry) (string, error) {
func journalFormatter(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
Expand All @@ -176,7 +239,7 @@ func journalFormatter(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func formatterPrefix(entry *journal.JournalEntry) (string, error) {
func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) {
usec := entry.RealtimeTimestamp
tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat)
output := fmt.Sprintf("%s ", tsString)
Expand All @@ -202,7 +265,7 @@ func formatterPrefix(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func formatterMessage(entry *journal.JournalEntry) (string, error) {
func formatterMessage(entry *sdjournal.JournalEntry) (string, error) {
// Finally, append the message
msg, ok := entry.Fields["MESSAGE"]
if !ok {
Expand All @@ -211,18 +274,3 @@ func formatterMessage(entry *journal.JournalEntry) (string, error) {
msg = strings.TrimSuffix(msg, "\n")
return msg, nil
}

type journaldFollowBuffer struct {
logChannel chan *logs.LogLine
withID bool
}

func (f journaldFollowBuffer) Write(p []byte) (int, error) {
bytestr := string(p)
logLine, err := logs.NewJournaldLogLine(bytestr, f.withID)
if err != nil {
return -1, err
}
f.logChannel <- logLine
return len(p), nil
}
2 changes: 1 addition & 1 deletion test/e2e/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var _ = Describe("Podman logs", func() {
})

It("podman logs on a created container should result in 0 exit code: "+log, func() {
session := podmanTest.Podman([]string{"create", "-t", "--name", "log", ALPINE})
session := podmanTest.Podman([]string{"create", "--log-driver", log, "-t", "--name", "log", ALPINE})
session.WaitWithDefaultTimeout()
Expect(session).To(Exit(0))

Expand Down
Loading

0 comments on commit 5b4ffc7

Please sign in to comment.