Skip to content

Commit

Permalink
filebeat/input/journald: allow specifying since when to read journal…
Browse files Browse the repository at this point in the history
…d entries (#35408)

Co-authored-by: Andrew Kroh <[email protected]>
  • Loading branch information
efd6 and andrewkroh authored Jul 27, 2023
1 parent b0124b4 commit 739e381
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add execution budget to CEL input. {pull}35409[35409]
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]
- Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507]
- Allow specifying since when to read journald entries. {pull}35408[35408]
- Add metrics for filestream input. {pull}35529[35529]
- Add support for collecting `httpjson` metrics. {pull}35392[35392]
- Add XML decoding support to CEL. {issue}34438[34438] {pull}35372[35372]
Expand Down
21 changes: 20 additions & 1 deletion filebeat/docs/inputs/input-journald.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ The position to start reading the journal from. Valid settings are:
will be sent until a new message is written.
* `cursor`: On first read, starts reading at the beginning of the journal. After
a reload or restart, continues reading at the last known position.
* `since`: Use the `since` option to determine where to start reading from.

If you have old log files and want to skip lines, start {beatname_uc} with
`seek: tail` specified. Then stop {beatname_uc}, set `seek: cursor`, and restart
Expand All @@ -136,7 +137,25 @@ If you have old log files and want to skip lines, start {beatname_uc} with
==== `cursor_seek_fallback`

The position to start reading the journal from if no cursor information is
available. Valid options are `head` and `tail`.
available. Valid options are `head`, `tail` and `since`.

[float]
[id="{beatname_lc}-input-{type}-since"]
==== `since`

A time offset from the current time to start reading from. To use
`since`, either the `seek` option must be set to `since`, or the `seek` mode
must be set to `cursor` and the `cursor_seek_fallback` set to `since`.

This example demonstrates how to resume from the persisted cursor when
it exists, or otherwise begin reading logs from the last 24 hours.

["source","yaml",subs="attributes"]
----
seek: cursor
cursor_seek_fallback: since
since: -24h
----

[float]
[id="{beatname_lc}-input-{type}-units"]
Expand Down
30 changes: 28 additions & 2 deletions filebeat/input/journald/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ type config struct {
// MaxBackoff is the limit of the backoff time.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`

// Since is the relative time offset from now to provide journal
// entries from. If Since is nil, no offset is applied.
Since *time.Duration `config:"since"`

// Seek is the method to read from journals.
Seek journalread.SeekMode `config:"seek"`

Expand Down Expand Up @@ -100,7 +104,11 @@ func (im *bwcIncludeMatches) Unpack(c *ucfg.Config) error {
return c.Unpack((*journalfield.IncludeMatches)(im))
}

var errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback")
var (
errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback")
errInvalidSeek = errors.New("invalid setting for seek")
errInvalidSeekSince = errors.New("incompatible setting for since and seek or cursor_seek_fallback")
)

func defaultConfig() config {
return config{
Expand All @@ -113,8 +121,26 @@ func defaultConfig() config {
}

func (c *config) Validate() error {
if c.CursorSeekFallback != journalread.SeekHead && c.CursorSeekFallback != journalread.SeekTail {
if c.Seek == journalread.SeekInvalid {
return errInvalidSeek
}
switch c.CursorSeekFallback {
case journalread.SeekHead, journalread.SeekTail, journalread.SeekSince:
default:
return errInvalidSeekFallback
}
if c.Since == nil {
switch {
case c.Seek == journalread.SeekSince,
c.Seek == journalread.SeekCursor && c.CursorSeekFallback == journalread.SeekSince:
return errInvalidSeekSince
default:
return nil
}
}
needSince := c.Seek == journalread.SeekSince || (c.Seek == journalread.SeekCursor && c.CursorSeekFallback == journalread.SeekSince)
if !needSince {
return errInvalidSeekSince
}
return nil
}
98 changes: 98 additions & 0 deletions filebeat/input/journald/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@
package journald

import (
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

jr "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalread"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

func TestConfigIncludeMatches(t *testing.T) {
Expand Down Expand Up @@ -62,3 +67,96 @@ include_matches:
verify(t, yaml)
})
}

func TestConfigValidate(t *testing.T) {
t.Run("table", func(t *testing.T) {

nameOf := [...]string{
jr.SeekInvalid: "invalid",
jr.SeekHead: "head",
jr.SeekTail: "tail",
jr.SeekCursor: "cursor",
jr.SeekSince: "since",
}

modes := []jr.SeekMode{
jr.SeekInvalid,
jr.SeekHead,
jr.SeekTail,
jr.SeekCursor,
jr.SeekSince,
}
const n = jr.SeekSince + 1

errSeek := errInvalidSeek
errFall := errInvalidSeekFallback
errSince := errInvalidSeekSince
// Want is the tables of expectations: seek in major, fallback in minor.
want := map[bool][n][n]error{
false: { // No since option set.
jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek},
jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil},
jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil},
jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: errSince},
jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince},
},
true: { // Since option set.
jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek},
jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince},
jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince},
jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: nil},
jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil},
},
}

for setSince := range want {
for _, seek := range modes {
for _, fallback := range modes {
name := fmt.Sprintf("seek_%s_fallback_%s_since_%t", nameOf[seek], nameOf[fallback], setSince)
t.Run(name, func(t *testing.T) {
cfg := config{Seek: seek, CursorSeekFallback: fallback}
if setSince {
cfg.Since = new(time.Duration)
}
got := cfg.Validate()
if !errors.Is(got, want[setSince][seek][fallback]) {
t.Errorf("unexpected error: got:%v want:%v", got, want[setSince][seek][fallback])
}
})
}
}
}
})

t.Run("use", func(t *testing.T) {
logger := logp.L()
for seek := jr.SeekInvalid; seek <= jr.SeekSince+1; seek++ {
for seekFallback := jr.SeekInvalid; seekFallback <= jr.SeekSince+1; seekFallback++ {
for _, since := range []*time.Duration{nil, new(time.Duration)} {
for _, pos := range []string{"", "defined"} {
// Construct a config with fields checked by Validate.
cfg := config{
Since: since,
Seek: seek,
CursorSeekFallback: seekFallback,
}
if err := cfg.Validate(); err != nil {
continue
}

// Confirm we never get to seek since mode with a nil since.
cp := checkpoint{Position: pos}
mode, _ := seekBy(logger, cp, cfg.Seek, cfg.CursorSeekFallback)
if mode == jr.SeekSince {
if cfg.Since == nil {
// If we reach here we would have panicked in Run.
t.Errorf("got nil since in valid seek since mode: seek=%d seek_fallback=%d since=%d cp=%+v",
seek, seekFallback, since, cp)
}
}
}
}
}
}
})
}
25 changes: 19 additions & 6 deletions filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
type journald struct {
Backoff time.Duration
MaxBackoff time.Duration
Since *time.Duration
Seek journalread.SeekMode
CursorSeekFallback journalread.SeekMode
Matches journalfield.IncludeMatches
Expand Down Expand Up @@ -104,6 +105,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
return sources, &journald{
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Since: config.Since,
Seek: config.Seek,
CursorSeekFallback: config.CursorSeekFallback,
Matches: journalfield.IncludeMatches(config.Matches),
Expand Down Expand Up @@ -140,7 +142,13 @@ func (inp *journald) Run(
}
defer reader.Close()

if err := reader.Seek(seekBy(ctx.Logger, currentCheckpoint, inp.Seek, inp.CursorSeekFallback)); err != nil {
mode, pos := seekBy(ctx.Logger, currentCheckpoint, inp.Seek, inp.CursorSeekFallback)
if mode == journalread.SeekSince {
err = reader.SeekRealtimeUsec(uint64(time.Now().Add(*inp.Since).UnixMicro()))
} else {
err = reader.Seek(mode, pos)
}
if err != nil {
log.Error("Continue from current position. Seek failed with: %v", err)
}

Expand Down Expand Up @@ -168,7 +176,10 @@ func (inp *journald) Run(
func (inp *journald) open(log *logp.Logger, canceler input.Canceler, src cursor.Source) (*journalread.Reader, error) {
backoff := backoff.NewExpBackoff(canceler.Done(), inp.Backoff, inp.MaxBackoff)
reader, err := journalread.Open(log, src.Name(), backoff,
withFilters(inp.Matches), withUnits(inp.Units), withTransports(inp.Transports), withSyslogIdentifiers(inp.Identifiers))
withFilters(inp.Matches),
withUnits(inp.Units),
withTransports(inp.Transports),
withSyslogIdentifiers(inp.Identifiers))
if err != nil {
return nil, sderr.Wrap(err, "failed to create reader for %{path} journal", src.Name())
}
Expand Down Expand Up @@ -223,12 +234,14 @@ func withSyslogIdentifiers(identifiers []string) func(*sdjournal.Journal) error
// seekBy tries to find the last known position in the journal, so we can continue collecting
// from the last known position.
// The checkpoint is ignored if the user has configured the input to always
// seek to the head/tail of the journal on startup.
func seekBy(log *logp.Logger, cp checkpoint, seek, defaultSeek journalread.SeekMode) (journalread.SeekMode, string) {
mode := seek
// seek to the head/tail/since of the journal on startup.
func seekBy(log *logp.Logger, cp checkpoint, seek, defaultSeek journalread.SeekMode) (mode journalread.SeekMode, pos string) {
mode = seek
if mode == journalread.SeekCursor && cp.Position == "" {
mode = defaultSeek
if mode != journalread.SeekHead && mode != journalread.SeekTail {
switch mode {
case journalread.SeekHead, journalread.SeekTail, journalread.SeekSince:
default:
log.Error("Invalid option for cursor_seek_fallback")
mode = journalread.SeekHead
}
Expand Down
58 changes: 37 additions & 21 deletions filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"path"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -219,6 +220,20 @@ func TestInputIncludeMatches(t *testing.T) {
// TestInputSeek test the output of various seek modes while reading
// from input-multiline-parser.journal.
func TestInputSeek(t *testing.T) {
// timeOfFirstEvent is the @timestamp on the "pam_unix" message.
var timeOfFirstEvent = time.Date(2021, time.November, 22, 17, 10, 4, 51729000, time.UTC)

var allMessages = []string{
"pam_unix(sudo:session): session closed for user root",
"Started Outputs some log lines.",
"1st line",
"2nd line",
"3rd line",
"4th line",
"5th line",
"6th line",
}

tests := map[string]struct {
config mapstr.M
expectedMessages []string
Expand All @@ -227,16 +242,7 @@ func TestInputSeek(t *testing.T) {
config: map[string]any{
"seek": "head",
},
expectedMessages: []string{
"pam_unix(sudo:session): session closed for user root",
"Started Outputs some log lines.",
"1st line",
"2nd line",
"3rd line",
"4th line",
"5th line",
"6th line",
},
expectedMessages: allMessages,
},
"seek tail": {
config: map[string]any{
Expand All @@ -248,24 +254,34 @@ func TestInputSeek(t *testing.T) {
config: map[string]any{
"seek": "cursor",
},
expectedMessages: []string{
"pam_unix(sudo:session): session closed for user root",
"Started Outputs some log lines.",
"1st line",
"2nd line",
"3rd line",
"4th line",
"5th line",
"6th line",
},
expectedMessages: allMessages,
},
"seek cursor fallback": {
"seek cursor with tail fallback": {
config: map[string]any{
"seek": "cursor",
"cursor_seek_fallback": "tail",
},
expectedMessages: nil, // No messages are expected because it will fall back to seek=tail.
},
"seek since": {
config: map[string]any{
"seek": "since",
// Query using one microsecond after the first event so that the first event
// is not returned. Note that journald uses microsecond precision for times.
"since": -1 * time.Since(timeOfFirstEvent.Add(time.Microsecond)),
},
expectedMessages: allMessages[1:],
},
"seek cursor with since fallback": {
config: map[string]any{
"seek": "cursor",
"cursor_seek_fallback": "since",
// Query using one microsecond after the first event so that the first event
// is not returned. Note that journald uses microsecond precision for times.
"since": -1 * time.Since(timeOfFirstEvent.Add(time.Microsecond)),
},
expectedMessages: allMessages[1:],
},
}

for name, testCase := range tests {
Expand Down
3 changes: 3 additions & 0 deletions filebeat/input/journald/pkg/journalread/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ const (
SeekTail
// SeekCursor option seeks to the position specified in the cursor
SeekCursor
// SeekSince option seeks to the position specified by the since option
SeekSince
)

var seekModes = map[string]SeekMode{
"head": SeekHead,
"tail": SeekTail,
"cursor": SeekCursor,
"since": SeekSince,
}

// Unpack validates and unpack "seek" config options. It returns an error if
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/journald/pkg/journalread/mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestMode_Unpack(t *testing.T) {
"head": SeekHead,
"tail": SeekTail,
"cursor": SeekCursor,
"since": SeekSince,
}

for str, want := range tests {
Expand Down
Loading

0 comments on commit 739e381

Please sign in to comment.