From 0bdabc7a74e9d6960fa959954b59cd5dc6dc6443 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Fri, 5 May 2023 16:51:51 +0930 Subject: [PATCH 1/9] filebeat/input/journald: allow specifying since when to read journald entries --- CHANGELOG.next.asciidoc | 1 + filebeat/docs/inputs/input-journald.asciidoc | 16 +++++ filebeat/input/journald/config.go | 30 +++++++- filebeat/input/journald/config_test.go | 72 +++++++++++++++++++ filebeat/input/journald/input.go | 25 +++++-- .../input/journald/pkg/journalread/mode.go | 3 + .../journald/pkg/journalread/mode_test.go | 1 + .../input/journald/pkg/journalread/reader.go | 15 ++++ 8 files changed, 155 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index d7e4277fcb8e..796daaf785c4 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -288,6 +288,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add execution budget to CEL input. {pull}35409[35409] - Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235] - Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507] +- Allow specifying since when to read journald entries. {pull}35408[35408] *Auditbeat* - Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817] diff --git a/filebeat/docs/inputs/input-journald.asciidoc b/filebeat/docs/inputs/input-journald.asciidoc index bbc4211b0c51..41edd2ea505e 100644 --- a/filebeat/docs/inputs/input-journald.asciidoc +++ b/filebeat/docs/inputs/input-journald.asciidoc @@ -128,11 +128,27 @@ multiple log messages are written to a journal while {beatname_uc} is down, only the last log message is sent on restart. * `cursor`: On first read, starts reading at the beginning of the journal. After a reload or restart, continues reading at the last known position. +* `since`: Use the `since` option to determine where to start reading from. If you have old log files and want to skip lines, start {beatname_uc} with `seek: tail` specified. Then stop {beatname_uc}, set `seek: cursor`, and restart {beatname_uc}. +[float] +[id="{beatname_lc}-input-{type}-cursor_seek_fallback"] +==== `cursor_seek_fallback` + +The position to start reading the journal from if no cursor information is +available. Valid options are `head`, `tail` and `since`. + +[float] +[id="{beatname_lc}-input-{type}-since"] +==== `since` + +A time offset from the current time's record to start reading from. To use +`since`, either the `seek` option must be set to `since`, or the `seek` mode +must be set to `cursor` and the `cursor_seek_fallback` set to `since`. + [float] [id="{beatname_lc}-input-{type}-units"] ==== `units` diff --git a/filebeat/input/journald/config.go b/filebeat/input/journald/config.go index 60f5881341b8..558cf89edff1 100644 --- a/filebeat/input/journald/config.go +++ b/filebeat/input/journald/config.go @@ -48,6 +48,10 @@ type config struct { // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` + // Since is the relative time offset from now to provide journal + // entries from. If Since is nil, no offset is applied. + Since *time.Duration `config:"since"` + // Seek is the method to read from journals. Seek journalread.SeekMode `config:"seek"` @@ -100,7 +104,11 @@ func (im *bwcIncludeMatches) Unpack(c *ucfg.Config) error { return c.Unpack((*journalfield.IncludeMatches)(im)) } -var errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback") +var ( + errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback") + errInvalidSeek = errors.New("invalid setting for seek") + errInvalidSeekSince = errors.New("incompatible setting for since and seek or cursor") +) func defaultConfig() config { return config{ @@ -113,8 +121,26 @@ func defaultConfig() config { } func (c *config) Validate() error { - if c.CursorSeekFallback != journalread.SeekHead && c.CursorSeekFallback != journalread.SeekTail { + if c.Seek == journalread.SeekInvalid { + return errInvalidSeek + } + switch c.CursorSeekFallback { + case journalread.SeekHead, journalread.SeekTail, journalread.SeekSince: + default: return errInvalidSeekFallback } + if c.Since == nil { + switch { + case c.Seek == journalread.SeekSince, + c.Seek == journalread.SeekCursor && c.CursorSeekFallback == journalread.SeekSince: + return errInvalidSeekSince + default: + return nil + } + } + needSince := c.Seek == journalread.SeekSince || (c.Seek == journalread.SeekCursor && c.CursorSeekFallback == journalread.SeekSince) + if !needSince { + return errInvalidSeekSince + } return nil } diff --git a/filebeat/input/journald/config_test.go b/filebeat/input/journald/config_test.go index e5205d8161dd..eb11b6e80b4a 100644 --- a/filebeat/input/journald/config_test.go +++ b/filebeat/input/journald/config_test.go @@ -20,11 +20,14 @@ package journald import ( + "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + jr "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalread" conf "github.com/elastic/elastic-agent-libs/config" ) @@ -62,3 +65,72 @@ include_matches: verify(t, yaml) }) } + +func TestConfigValidate(t *testing.T) { + nameOf := [...]string{ + jr.SeekInvalid: "invalid", + jr.SeekHead: "head", + jr.SeekTail: "tail", + jr.SeekCursor: "cursor", + jr.SeekSince: "since", + } + + modes := []jr.SeekMode{ + jr.SeekInvalid, + jr.SeekHead, + jr.SeekTail, + jr.SeekCursor, + jr.SeekSince, + } + const n = jr.SeekSince + 1 + + errSeek := errInvalidSeek + errFall := errInvalidSeekFallback + errSince := errInvalidSeekSince + // Want is the tables of expectations: seek in major, fallback in minor. + want := map[bool][n][n]error{ + false: { // No since option set. + jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek}, + jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + }, + true: { // Since option set. + jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek}, + jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + }, + } + + for setSince := range want { + for _, seek := range modes { + for _, fallback := range modes { + name := fmt.Sprintf("seek_%s_fallback_%s_since_%t", nameOf[seek], nameOf[fallback], setSince) + t.Run(name, func(t *testing.T) { + cfg := config{Seek: seek, CursorSeekFallback: fallback} + if setSince { + cfg.Since = new(time.Duration) + } + got := cfg.Validate() + if !sameError(got, want[setSince][seek][fallback]) { + t.Errorf("unexpected error: got:%v want:%v", got, want[setSince][seek][fallback]) + } + }) + } + } + } +} + +func sameError(a, b error) bool { + switch { + case a == nil && b == nil: + return true + case a == nil, b == nil: + return false + default: + return a.Error() == b.Error() + } +} diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index e4d3d858dd65..5f36a1bcdd19 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -40,6 +40,7 @@ import ( type journald struct { Backoff time.Duration MaxBackoff time.Duration + Since *time.Duration Seek journalread.SeekMode CursorSeekFallback journalread.SeekMode Matches journalfield.IncludeMatches @@ -104,6 +105,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { return sources, &journald{ Backoff: config.Backoff, MaxBackoff: config.MaxBackoff, + Since: config.Since, Seek: config.Seek, CursorSeekFallback: config.CursorSeekFallback, Matches: journalfield.IncludeMatches(config.Matches), @@ -140,7 +142,13 @@ func (inp *journald) Run( } defer reader.Close() - if err := reader.Seek(seekBy(ctx.Logger, currentCheckpoint, inp.Seek, inp.CursorSeekFallback)); err != nil { + mode, pos := seekBy(ctx.Logger, currentCheckpoint, inp.Seek, inp.CursorSeekFallback) + if mode == journalread.SeekSince { + err = reader.SeekRealtimeUsec(uint64(time.Now().Add(*inp.Since).UnixNano() / 1000)) + } else { + err = reader.Seek(mode, pos) + } + if err != nil { log.Error("Continue from current position. Seek failed with: %v", err) } @@ -168,7 +176,10 @@ func (inp *journald) Run( func (inp *journald) open(log *logp.Logger, canceler input.Canceler, src cursor.Source) (*journalread.Reader, error) { backoff := backoff.NewExpBackoff(canceler.Done(), inp.Backoff, inp.MaxBackoff) reader, err := journalread.Open(log, src.Name(), backoff, - withFilters(inp.Matches), withUnits(inp.Units), withTransports(inp.Transports), withSyslogIdentifiers(inp.Identifiers)) + withFilters(inp.Matches), + withUnits(inp.Units), + withTransports(inp.Transports), + withSyslogIdentifiers(inp.Identifiers)) if err != nil { return nil, sderr.Wrap(err, "failed to create reader for %{path} journal", src.Name()) } @@ -223,12 +234,14 @@ func withSyslogIdentifiers(identifiers []string) func(*sdjournal.Journal) error // seekBy tries to find the last known position in the journal, so we can continue collecting // from the last known position. // The checkpoint is ignored if the user has configured the input to always -// seek to the head/tail of the journal on startup. -func seekBy(log *logp.Logger, cp checkpoint, seek, defaultSeek journalread.SeekMode) (journalread.SeekMode, string) { - mode := seek +// seek to the head/tail/since of the journal on startup. +func seekBy(log *logp.Logger, cp checkpoint, seek, defaultSeek journalread.SeekMode) (mode journalread.SeekMode, pos string) { + mode = seek if mode == journalread.SeekCursor && cp.Position == "" { mode = defaultSeek - if mode != journalread.SeekHead && mode != journalread.SeekTail { + switch mode { + case journalread.SeekHead, journalread.SeekTail, journalread.SeekSince: + default: log.Error("Invalid option for cursor_seek_fallback") mode = journalread.SeekHead } diff --git a/filebeat/input/journald/pkg/journalread/mode.go b/filebeat/input/journald/pkg/journalread/mode.go index 36132ffe11d6..3c6fa923361a 100644 --- a/filebeat/input/journald/pkg/journalread/mode.go +++ b/filebeat/input/journald/pkg/journalread/mode.go @@ -31,12 +31,15 @@ const ( SeekTail // SeekCursor option seeks to the position specified in the cursor SeekCursor + // SeekSince option seeks to the position specified by the since option + SeekSince ) var seekModes = map[string]SeekMode{ "head": SeekHead, "tail": SeekTail, "cursor": SeekCursor, + "since": SeekSince, } // Unpack validates and unpack "seek" config options. It returns an error if diff --git a/filebeat/input/journald/pkg/journalread/mode_test.go b/filebeat/input/journald/pkg/journalread/mode_test.go index aef0ed4150cc..7b323a06be58 100644 --- a/filebeat/input/journald/pkg/journalread/mode_test.go +++ b/filebeat/input/journald/pkg/journalread/mode_test.go @@ -27,6 +27,7 @@ func TestMode_Unpack(t *testing.T) { "head": SeekHead, "tail": SeekTail, "cursor": SeekCursor, + "since": SeekSince, } for str, want := range tests { diff --git a/filebeat/input/journald/pkg/journalread/reader.go b/filebeat/input/journald/pkg/journalread/reader.go index 571de214b557..474f4a67c3d7 100644 --- a/filebeat/input/journald/pkg/journalread/reader.go +++ b/filebeat/input/journald/pkg/journalread/reader.go @@ -54,6 +54,7 @@ type journal interface { GetEntry() (*sdjournal.JournalEntry, error) SeekHead() error SeekTail() error + SeekRealtimeUsec(usec uint64) error SeekCursor(string) error } @@ -146,6 +147,20 @@ func (r *Reader) Seek(mode SeekMode, cursor string) (err error) { return err } +// Seek moves the read pointer to a new position. +// If a cursor or SeekTail is given, Seek tries to ignore the entry at the +// given position, jumping right to the next entry. +func (r *Reader) SeekRealtimeUsec(usec uint64) error { + err := r.journal.SeekRealtimeUsec(usec) + if err != nil { + return err + } + // A call to Next is required for the journal to be in a valid state to call Get. + // https://pkg.go.dev/github.com/coreos/go-systemd/v22/sdjournal#Journal.SeekRealtimeUsec + _, err = r.journal.Next() + return err +} + // Next reads a new journald entry from the journal. It blocks if there is // currently no entry available in the journal, or until an error has occurred. func (r *Reader) Next(cancel canceler) (*sdjournal.JournalEntry, error) { From 1e8fcea223693d3026964cd5d4a0645840268c65 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 10 May 2023 11:10:40 +0930 Subject: [PATCH 2/9] remove unecessary interface indirection --- filebeat/input/journald/pkg/journalread/reader.go | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/filebeat/input/journald/pkg/journalread/reader.go b/filebeat/input/journald/pkg/journalread/reader.go index 474f4a67c3d7..78dd9f1d030f 100644 --- a/filebeat/input/journald/pkg/journalread/reader.go +++ b/filebeat/input/journald/pkg/journalread/reader.go @@ -39,7 +39,7 @@ import ( type Reader struct { log *logp.Logger backoff backoff.Backoff - journal journal + journal *sdjournal.Journal } type canceler interface { @@ -47,23 +47,12 @@ type canceler interface { Err() error } -type journal interface { - Close() error - Next() (uint64, error) - Wait(time.Duration) int - GetEntry() (*sdjournal.JournalEntry, error) - SeekHead() error - SeekTail() error - SeekRealtimeUsec(usec uint64) error - SeekCursor(string) error -} - // LocalSystemJournalID is the ID of the local system journal. const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL" // NewReader creates a new Reader for an already opened journal. The reader assumed to take // ownership of the journal, and needs to be closed. -func NewReader(log *logp.Logger, journal journal, backoff backoff.Backoff) *Reader { +func NewReader(log *logp.Logger, journal *sdjournal.Journal, backoff backoff.Backoff) *Reader { return &Reader{log: log, journal: journal, backoff: backoff} } From 15979c19d60fefdedd3365b613c719dcee1bd0d4 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 11 May 2023 09:37:14 +0930 Subject: [PATCH 3/9] address pr comments --- filebeat/docs/inputs/input-journald.asciidoc | 2 +- filebeat/input/journald/config.go | 2 +- filebeat/input/journald/pkg/journalread/reader.go | 15 ++++----------- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/filebeat/docs/inputs/input-journald.asciidoc b/filebeat/docs/inputs/input-journald.asciidoc index 41edd2ea505e..1367269e1d9a 100644 --- a/filebeat/docs/inputs/input-journald.asciidoc +++ b/filebeat/docs/inputs/input-journald.asciidoc @@ -145,7 +145,7 @@ available. Valid options are `head`, `tail` and `since`. [id="{beatname_lc}-input-{type}-since"] ==== `since` -A time offset from the current time's record to start reading from. To use +A time offset from the current time to start reading from. To use `since`, either the `seek` option must be set to `since`, or the `seek` mode must be set to `cursor` and the `cursor_seek_fallback` set to `since`. diff --git a/filebeat/input/journald/config.go b/filebeat/input/journald/config.go index 558cf89edff1..5f18cbce211f 100644 --- a/filebeat/input/journald/config.go +++ b/filebeat/input/journald/config.go @@ -107,7 +107,7 @@ func (im *bwcIncludeMatches) Unpack(c *ucfg.Config) error { var ( errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback") errInvalidSeek = errors.New("invalid setting for seek") - errInvalidSeekSince = errors.New("incompatible setting for since and seek or cursor") + errInvalidSeekSince = errors.New("incompatible setting for since and seek or cursor_seek_fallback") ) func defaultConfig() config { diff --git a/filebeat/input/journald/pkg/journalread/reader.go b/filebeat/input/journald/pkg/journalread/reader.go index 78dd9f1d030f..5f647c2b09b9 100644 --- a/filebeat/input/journald/pkg/journalread/reader.go +++ b/filebeat/input/journald/pkg/journalread/reader.go @@ -136,18 +136,11 @@ func (r *Reader) Seek(mode SeekMode, cursor string) (err error) { return err } -// Seek moves the read pointer to a new position. -// If a cursor or SeekTail is given, Seek tries to ignore the entry at the -// given position, jumping right to the next entry. +// SeekRealtimeUsec moves the read pointer to the entry with the +// specified CLOCK_REALTIME timestamp. This corresponds to +// sd_journal_seek_realtime_usec. func (r *Reader) SeekRealtimeUsec(usec uint64) error { - err := r.journal.SeekRealtimeUsec(usec) - if err != nil { - return err - } - // A call to Next is required for the journal to be in a valid state to call Get. - // https://pkg.go.dev/github.com/coreos/go-systemd/v22/sdjournal#Journal.SeekRealtimeUsec - _, err = r.journal.Next() - return err + return r.journal.SeekRealtimeUsec(usec) } // Next reads a new journald entry from the journal. It blocks if there is From b673122ee89ca30ae8e11b362d06b3d7b71a1e9a Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Fri, 26 May 2023 19:15:51 +0930 Subject: [PATCH 4/9] address pr comment --- filebeat/input/journald/config_test.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/filebeat/input/journald/config_test.go b/filebeat/input/journald/config_test.go index eb11b6e80b4a..68e65fc1e8cc 100644 --- a/filebeat/input/journald/config_test.go +++ b/filebeat/input/journald/config_test.go @@ -20,6 +20,7 @@ package journald import ( + "errors" "fmt" "testing" "time" @@ -115,7 +116,7 @@ func TestConfigValidate(t *testing.T) { cfg.Since = new(time.Duration) } got := cfg.Validate() - if !sameError(got, want[setSince][seek][fallback]) { + if !errors.Is(got, want[setSince][seek][fallback]) { t.Errorf("unexpected error: got:%v want:%v", got, want[setSince][seek][fallback]) } }) @@ -123,14 +124,3 @@ func TestConfigValidate(t *testing.T) { } } } - -func sameError(a, b error) bool { - switch { - case a == nil && b == nil: - return true - case a == nil, b == nil: - return false - default: - return a.Error() == b.Error() - } -} From c9ae18197f00f1dd6b24515ea8bcd3a9d3767fb8 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 27 May 2023 06:35:51 +0930 Subject: [PATCH 5/9] address pr comment --- filebeat/input/journald/pkg/journalread/reader.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/filebeat/input/journald/pkg/journalread/reader.go b/filebeat/input/journald/pkg/journalread/reader.go index 5f647c2b09b9..9994c0aad7c1 100644 --- a/filebeat/input/journald/pkg/journalread/reader.go +++ b/filebeat/input/journald/pkg/journalread/reader.go @@ -39,7 +39,7 @@ import ( type Reader struct { log *logp.Logger backoff backoff.Backoff - journal *sdjournal.Journal + journal journal } type canceler interface { @@ -47,12 +47,23 @@ type canceler interface { Err() error } +type journal interface { + Close() error + Next() (uint64, error) + Wait(time.Duration) int + GetEntry() (*sdjournal.JournalEntry, error) + SeekHead() error + SeekTail() error + SeekRealtimeUsec(usec uint64) error + SeekCursor(string) error +} + // LocalSystemJournalID is the ID of the local system journal. const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL" // NewReader creates a new Reader for an already opened journal. The reader assumed to take // ownership of the journal, and needs to be closed. -func NewReader(log *logp.Logger, journal *sdjournal.Journal, backoff backoff.Backoff) *Reader { +func NewReader(log *logp.Logger, journal journal, backoff backoff.Backoff) *Reader { return &Reader{log: log, journal: journal, backoff: backoff} } From 25456bcb8b5f22054e0ca67946cd125f955a06af Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 27 May 2023 06:38:26 +0930 Subject: [PATCH 6/9] address pr comment --- filebeat/input/journald/input.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index 5f36a1bcdd19..c32d677ffa44 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -144,7 +144,7 @@ func (inp *journald) Run( mode, pos := seekBy(ctx.Logger, currentCheckpoint, inp.Seek, inp.CursorSeekFallback) if mode == journalread.SeekSince { - err = reader.SeekRealtimeUsec(uint64(time.Now().Add(*inp.Since).UnixNano() / 1000)) + err = reader.SeekRealtimeUsec(uint64(time.Now().Add(*inp.Since).UnixMicro())) } else { err = reader.Seek(mode, pos) } From fd0a9d799041144b983725674f3660020baaa39d Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Sat, 27 May 2023 12:03:37 +0930 Subject: [PATCH 7/9] add test to demonstrate validated config use safety --- filebeat/input/journald/config_test.go | 136 ++++++++++++++++--------- 1 file changed, 86 insertions(+), 50 deletions(-) diff --git a/filebeat/input/journald/config_test.go b/filebeat/input/journald/config_test.go index 68e65fc1e8cc..cf6abe848b54 100644 --- a/filebeat/input/journald/config_test.go +++ b/filebeat/input/journald/config_test.go @@ -30,6 +30,7 @@ import ( jr "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalread" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) func TestConfigIncludeMatches(t *testing.T) { @@ -68,59 +69,94 @@ include_matches: } func TestConfigValidate(t *testing.T) { - nameOf := [...]string{ - jr.SeekInvalid: "invalid", - jr.SeekHead: "head", - jr.SeekTail: "tail", - jr.SeekCursor: "cursor", - jr.SeekSince: "since", - } + t.Run("table", func(t *testing.T) { + + nameOf := [...]string{ + jr.SeekInvalid: "invalid", + jr.SeekHead: "head", + jr.SeekTail: "tail", + jr.SeekCursor: "cursor", + jr.SeekSince: "since", + } - modes := []jr.SeekMode{ - jr.SeekInvalid, - jr.SeekHead, - jr.SeekTail, - jr.SeekCursor, - jr.SeekSince, - } - const n = jr.SeekSince + 1 - - errSeek := errInvalidSeek - errFall := errInvalidSeekFallback - errSince := errInvalidSeekSince - // Want is the tables of expectations: seek in major, fallback in minor. - want := map[bool][n][n]error{ - false: { // No since option set. - jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek}, - jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, - jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, - jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: errSince}, - jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, - }, - true: { // Since option set. - jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek}, - jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, - jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, - jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: nil}, - jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, - }, - } + modes := []jr.SeekMode{ + jr.SeekInvalid, + jr.SeekHead, + jr.SeekTail, + jr.SeekCursor, + jr.SeekSince, + } + const n = jr.SeekSince + 1 + + errSeek := errInvalidSeek + errFall := errInvalidSeekFallback + errSince := errInvalidSeekSince + // Want is the tables of expectations: seek in major, fallback in minor. + want := map[bool][n][n]error{ + false: { // No since option set. + jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek}, + jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + }, + true: { // Since option set. + jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek}, + jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + }, + } - for setSince := range want { - for _, seek := range modes { - for _, fallback := range modes { - name := fmt.Sprintf("seek_%s_fallback_%s_since_%t", nameOf[seek], nameOf[fallback], setSince) - t.Run(name, func(t *testing.T) { - cfg := config{Seek: seek, CursorSeekFallback: fallback} - if setSince { - cfg.Since = new(time.Duration) - } - got := cfg.Validate() - if !errors.Is(got, want[setSince][seek][fallback]) { - t.Errorf("unexpected error: got:%v want:%v", got, want[setSince][seek][fallback]) + for setSince := range want { + for _, seek := range modes { + for _, fallback := range modes { + name := fmt.Sprintf("seek_%s_fallback_%s_since_%t", nameOf[seek], nameOf[fallback], setSince) + t.Run(name, func(t *testing.T) { + cfg := config{Seek: seek, CursorSeekFallback: fallback} + if setSince { + cfg.Since = new(time.Duration) + } + got := cfg.Validate() + if !errors.Is(got, want[setSince][seek][fallback]) { + t.Errorf("unexpected error: got:%v want:%v", got, want[setSince][seek][fallback]) + } + }) + } + } + } + }) + + t.Run("use", func(t *testing.T) { + logger := logp.L() + for seek := jr.SeekInvalid; seek <= jr.SeekSince+1; seek++ { + for seekFallback := jr.SeekInvalid; seekFallback <= jr.SeekSince+1; seekFallback++ { + for _, since := range []*time.Duration{nil, new(time.Duration)} { + for _, pos := range []string{"", "defined"} { + // Construct a config with fields checked by Validate. + cfg := config{ + Since: since, + Seek: seek, + CursorSeekFallback: seekFallback, + } + if err := cfg.Validate(); err != nil { + continue + } + + // Confirm we never get to seek since mode with a nil since. + cp := checkpoint{Position: pos} + mode, _ := seekBy(logger, cp, cfg.Seek, cfg.CursorSeekFallback) + if mode == jr.SeekSince { + if cfg.Since == nil { + // If we reach here we would have panicked in Run. + t.Errorf("got nil since in valid seek since mode: seek=%d seek_fallback=%d since=%d cp=%+v", + seek, seekFallback, since, cp) + } + } } - }) + } } } - } + }) } From 2031ba041db8a3df4d950f2dbffa4fdd082979ff Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 23 Jun 2023 11:15:47 -0400 Subject: [PATCH 8/9] Add tests for 'seek: since' Add two test cases that verify "seek: since" and "cursor_seek_fallback: since" are working. --- .../input/journald/input_filtering_test.go | 58 ++++++++++++------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 7724c6285fac..a985f1f52583 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -23,6 +23,7 @@ import ( "context" "path" "testing" + "time" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -219,6 +220,20 @@ func TestInputIncludeMatches(t *testing.T) { // TestInputSeek test the output of various seek modes while reading // from input-multiline-parser.journal. func TestInputSeek(t *testing.T) { + // timeOfFirstEvent is the @timestamp on the "pam_unix" message. + var timeOfFirstEvent = time.Date(2021, time.November, 22, 17, 10, 4, 51729000, time.UTC) + + var allMessages = []string{ + "pam_unix(sudo:session): session closed for user root", + "Started Outputs some log lines.", + "1st line", + "2nd line", + "3rd line", + "4th line", + "5th line", + "6th line", + } + tests := map[string]struct { config mapstr.M expectedMessages []string @@ -227,16 +242,7 @@ func TestInputSeek(t *testing.T) { config: map[string]any{ "seek": "head", }, - expectedMessages: []string{ - "pam_unix(sudo:session): session closed for user root", - "Started Outputs some log lines.", - "1st line", - "2nd line", - "3rd line", - "4th line", - "5th line", - "6th line", - }, + expectedMessages: allMessages, }, "seek tail": { config: map[string]any{ @@ -248,24 +254,34 @@ func TestInputSeek(t *testing.T) { config: map[string]any{ "seek": "cursor", }, - expectedMessages: []string{ - "pam_unix(sudo:session): session closed for user root", - "Started Outputs some log lines.", - "1st line", - "2nd line", - "3rd line", - "4th line", - "5th line", - "6th line", - }, + expectedMessages: allMessages, }, - "seek cursor fallback": { + "seek cursor with tail fallback": { config: map[string]any{ "seek": "cursor", "cursor_seek_fallback": "tail", }, expectedMessages: nil, // No messages are expected because it will fall back to seek=tail. }, + "seek since": { + config: map[string]any{ + "seek": "since", + // Query using one microsecond after the first event so that the first event + // is not returned. Note that journald uses microsecond precision for times. + "since": -1 * time.Since(timeOfFirstEvent.Add(time.Microsecond)), + }, + expectedMessages: allMessages[1:], + }, + "seek cursor with since fallback": { + config: map[string]any{ + "seek": "cursor", + "cursor_seek_fallback": "since", + // Query using one microsecond after the first event so that the first event + // is not returned. Note that journald uses microsecond precision for times. + "since": -1 * time.Since(timeOfFirstEvent.Add(time.Microsecond)), + }, + expectedMessages: allMessages[1:], + }, } for name, testCase := range tests { From a1e810fa3c5b89564bfdf506d26b72d73b1863cb Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 23 Jun 2023 11:19:56 -0400 Subject: [PATCH 9/9] docs: add common 'since' use case example --- filebeat/docs/inputs/input-journald.asciidoc | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/filebeat/docs/inputs/input-journald.asciidoc b/filebeat/docs/inputs/input-journald.asciidoc index d3302cf1c3b7..5b932b4d1330 100644 --- a/filebeat/docs/inputs/input-journald.asciidoc +++ b/filebeat/docs/inputs/input-journald.asciidoc @@ -147,6 +147,16 @@ A time offset from the current time to start reading from. To use `since`, either the `seek` option must be set to `since`, or the `seek` mode must be set to `cursor` and the `cursor_seek_fallback` set to `since`. +This example demonstrates how to resume from the persisted cursor when +it exists, or otherwise begin reading logs from the last 24 hours. + +["source","yaml",subs="attributes"] +---- +seek: cursor +cursor_seek_fallback: since +since: -24h +---- + [float] [id="{beatname_lc}-input-{type}-units"] ==== `units`