Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

journald logger: fix race condition #10431

Merged
merged 1 commit into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
276 changes: 162 additions & 114 deletions libpod/container_log_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,12 @@ package libpod
import (
"context"
"fmt"
"io"
"math"
"strings"
"time"

"github.com/containers/podman/v3/libpod/define"
"github.com/containers/podman/v3/libpod/events"
"github.com/containers/podman/v3/libpod/logs"
journal "github.com/coreos/go-systemd/v22/sdjournal"
"github.com/coreos/go-systemd/v22/sdjournal"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
Expand All @@ -24,122 +22,187 @@ const (

// journaldLogErr is the journald priority signifying stderr
journaldLogErr = "3"

// bufLen is the length of the buffer to read from a k8s-file
// formatted log line
// let's set it as 2k just to be safe if k8s-file format ever changes
bufLen = 16384
)

func (c *Container) readFromJournal(ctx context.Context, options *logs.LogOptions, logChannel chan *logs.LogLine) error {
var config journal.JournalReaderConfig
if options.Tail < 0 {
config.NumFromTail = 0
} else if options.Tail == 0 {
config.NumFromTail = math.MaxUint64
} else {
config.NumFromTail = uint64(options.Tail)
journal, err := sdjournal.NewJournal()
if err != nil {
return err
}
if options.Multi {
config.Formatter = journalFormatterWithID
} else {
config.Formatter = journalFormatter
}
defaultTime := time.Time{}
if options.Since != defaultTime {
// coreos/go-systemd/sdjournal doesn't correctly handle requests for data in the future
// return nothing instead of falsely printing
if time.Now().Before(options.Since) {
return nil
}
// coreos/go-systemd/sdjournal expects a negative time.Duration for times in the past
config.Since = -time.Since(options.Since)
vrothberg marked this conversation as resolved.
Show resolved Hide resolved
// While logs are written to the `logChannel`, we inspect each event
// and stop once the container has died. Having logs and events in one
// stream prevents a race condition that we faced in #10323.

// Add the filters for events.
match := sdjournal.Match{Field: "SYSLOG_IDENTIFIER", Value: "podman"}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
match = sdjournal.Match{Field: "PODMAN_ID", Value: c.ID()}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}
config.Matches = append(config.Matches, journal.Match{
Field: "CONTAINER_ID_FULL",
Value: c.ID(),
})
options.WaitGroup.Add(1)

r, err := journal.NewJournalReader(config)
if err != nil {
// Add the filter for logs. Note the disjunction so that we match
// either the events or the logs.
if err := journal.AddDisjunction(); err != nil {
return errors.Wrap(err, "adding filter disjunction to journald logger")
}
match = sdjournal.Match{Field: "CONTAINER_ID_FULL", Value: c.ID()}
if err := journal.AddMatch(match.String()); err != nil {
return errors.Wrapf(err, "adding filter to journald logger: %v", match)
}

if err := journal.SeekHead(); err != nil {
return err
}
if r == nil {
return errors.Errorf("journal reader creation failed")
// API requires Next() immediately after SeekHead().
if _, err := journal.Next(); err != nil {
return errors.Wrap(err, "initial journal cursor")
}
if options.Tail == math.MaxInt64 {
r.Rewind()

// API requires a next|prev before getting a cursor.
if _, err := journal.Previous(); err != nil {
return errors.Wrap(err, "initial journal cursor")
}
state, err := c.State()
if err != nil {
return err

// Note that the initial cursor may not yet be ready, so we'll do an
// exponential backoff.
var cursor string
var cursorError error
for i := 1; i <= 3; i++ {
cursor, cursorError = journal.GetCursor()
if err != nil {
continue
}
time.Sleep(time.Duration(i*100) * time.Millisecond)
break
}
if cursorError != nil {
return errors.Wrap(cursorError, "inital journal cursor")
}

// We need the container's events in the same journal to guarantee
// consistency, see #10323.
if options.Follow && c.runtime.config.Engine.EventsLogger != "journald" {
return errors.Errorf("using --follow with the journald --log-driver but without the journald --events-backend (%s) is not supported", c.runtime.config.Engine.EventsLogger)
}

if options.Follow && state == define.ContainerStateRunning {
go func() {
done := make(chan bool)
until := make(chan time.Time)
go func() {
select {
case <-ctx.Done():
until <- time.Time{}
case <-done:
// nothing to do anymore
options.WaitGroup.Add(1)
go func() {
defer func() {
options.WaitGroup.Done()
if err := journal.Close(); err != nil {
logrus.Errorf("Unable to close journal: %v", err)
}
}()

afterTimeStamp := false // needed for options.Since
tailQueue := []*logs.LogLine{} // needed for options.Tail
doTail := options.Tail > 0
for {
select {
case <-ctx.Done():
// Remote client may have closed/lost the connection.
return
default:
// Fallthrough
}

if _, err := journal.Next(); err != nil {
logrus.Errorf("Failed to move journal cursor to next entry: %v", err)
return
}
latestCursor, err := journal.GetCursor()
if err != nil {
logrus.Errorf("Failed to get journal cursor: %v", err)
return
}

// Hit the end of the journal.
if cursor == latestCursor {
if doTail {
// Flush *once* we hit the end of the journal.
startIndex := int64(len(tailQueue)-1) - options.Tail
if startIndex < 0 {
startIndex = 0
}
for i := startIndex; i < int64(len(tailQueue)); i++ {
logChannel <- tailQueue[i]
}
tailQueue = nil
doTail = false
}
// Unless we follow, quit.
if !options.Follow {
return
}
}()
go func() {
// FIXME (#10323): we are facing a terrible
// race condition here. At the time the
// container dies and `c.Wait()` has returned,
// we may not have received all journald logs.
// So far there is no other way than waiting
// for a second. Ultimately, `r.Follow` is
// racy and we may have to implement our custom
// logic here.
c.Wait(ctx)
time.Sleep(time.Second)
until <- time.Time{}
}()
follower := journaldFollowBuffer{logChannel, options.Multi}
err := r.Follow(until, follower)
// Sleep until something's happening on the journal.
journal.Wait(sdjournal.IndefiniteWait)
continue
}
cursor = latestCursor

entry, err := journal.GetEntry()
if err != nil {
logrus.Debugf(err.Error())
logrus.Errorf("Failed to get journal entry: %v", err)
return
}
r.Close()
options.WaitGroup.Done()
done <- true
return
}()
return nil
}

go func() {
bytes := make([]byte, bufLen)
// /me complains about no do-while in go
ec, err := r.Read(bytes)
for ec != 0 && err == nil {
// because we are reusing bytes, we need to make
// sure the old data doesn't get into the new line
bytestr := string(bytes[:ec])
logLine, err2 := logs.NewJournaldLogLine(bytestr, options.Multi)
if err2 != nil {
logrus.Error(err2)
if !afterTimeStamp {
entryTime := time.Unix(0, int64(entry.RealtimeTimestamp)*int64(time.Microsecond))
if entryTime.Before(options.Since) {
continue
}
afterTimeStamp = true
}

// If we're reading an event and the container exited/died,
// then we're done and can return.
event, ok := entry.Fields["PODMAN_EVENT"]
if ok {
status, err := events.StringToStatus(event)
if err != nil {
logrus.Errorf("Failed to translate event: %v", err)
return
}
if status == events.Exited {
return
}
continue
}

var message string
var formatError error

if options.Multi {
message, formatError = journalFormatterWithID(entry)
} else {
message, formatError = journalFormatter(entry)
}

if formatError != nil {
logrus.Errorf("Failed to parse journald log entry: %v", err)
return
}

logLine, err := logs.NewJournaldLogLine(message, options.Multi)
if err != nil {
logrus.Errorf("Failed parse log line: %v", err)
return
}
if doTail {
tailQueue = append(tailQueue, logLine)
continue
}
logChannel <- logLine
ec, err = r.Read(bytes)
}
if err != nil && err != io.EOF {
logrus.Error(err)
}
r.Close()
options.WaitGroup.Done()
}()

return nil
}

func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
func journalFormatterWithID(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
Expand All @@ -162,7 +225,7 @@ func journalFormatterWithID(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func journalFormatter(entry *journal.JournalEntry) (string, error) {
func journalFormatter(entry *sdjournal.JournalEntry) (string, error) {
output, err := formatterPrefix(entry)
if err != nil {
return "", err
Expand All @@ -176,7 +239,7 @@ func journalFormatter(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func formatterPrefix(entry *journal.JournalEntry) (string, error) {
func formatterPrefix(entry *sdjournal.JournalEntry) (string, error) {
usec := entry.RealtimeTimestamp
tsString := time.Unix(0, int64(usec)*int64(time.Microsecond)).Format(logs.LogTimeFormat)
output := fmt.Sprintf("%s ", tsString)
Expand All @@ -202,7 +265,7 @@ func formatterPrefix(entry *journal.JournalEntry) (string, error) {
return output, nil
}

func formatterMessage(entry *journal.JournalEntry) (string, error) {
func formatterMessage(entry *sdjournal.JournalEntry) (string, error) {
// Finally, append the message
msg, ok := entry.Fields["MESSAGE"]
if !ok {
Expand All @@ -211,18 +274,3 @@ func formatterMessage(entry *journal.JournalEntry) (string, error) {
msg = strings.TrimSuffix(msg, "\n")
return msg, nil
}

type journaldFollowBuffer struct {
logChannel chan *logs.LogLine
withID bool
}

func (f journaldFollowBuffer) Write(p []byte) (int, error) {
bytestr := string(p)
logLine, err := logs.NewJournaldLogLine(bytestr, f.withID)
if err != nil {
return -1, err
}
f.logChannel <- logLine
return len(p), nil
}
2 changes: 1 addition & 1 deletion test/e2e/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ var _ = Describe("Podman logs", func() {
})

It("podman logs on a created container should result in 0 exit code: "+log, func() {
session := podmanTest.Podman([]string{"create", "-t", "--name", "log", ALPINE})
session := podmanTest.Podman([]string{"create", "--log-driver", log, "-t", "--name", "log", ALPINE})
session.WaitWithDefaultTimeout()
Expect(session).To(Exit(0))

Expand Down
Loading