diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3dad0677f197..123ac9d03fd3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -227,6 +227,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415 - Add execution budget to CEL input. {pull}35409[35409] - Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235] - Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507] +- Allow specifying since when to read journald entries. {pull}35408[35408] - Add metrics for filestream input. {pull}35529[35529] - Add support for collecting `httpjson` metrics. {pull}35392[35392] - Add XML decoding support to CEL. {issue}34438[34438] {pull}35372[35372] diff --git a/filebeat/docs/inputs/input-journald.asciidoc b/filebeat/docs/inputs/input-journald.asciidoc index 1eb161deef79..5b932b4d1330 100644 --- a/filebeat/docs/inputs/input-journald.asciidoc +++ b/filebeat/docs/inputs/input-journald.asciidoc @@ -126,6 +126,7 @@ The position to start reading the journal from. Valid settings are: will be sent until a new message is written. * `cursor`: On first read, starts reading at the beginning of the journal. After a reload or restart, continues reading at the last known position. +* `since`: Use the `since` option to determine where to start reading from. If you have old log files and want to skip lines, start {beatname_uc} with `seek: tail` specified. Then stop {beatname_uc}, set `seek: cursor`, and restart @@ -136,7 +137,25 @@ If you have old log files and want to skip lines, start {beatname_uc} with ==== `cursor_seek_fallback` The position to start reading the journal from if no cursor information is -available. Valid options are `head` and `tail`. +available. Valid options are `head`, `tail` and `since`. + +[float] +[id="{beatname_lc}-input-{type}-since"] +==== `since` + +A time offset from the current time to start reading from. To use +`since`, either the `seek` option must be set to `since`, or the `seek` mode +must be set to `cursor` and the `cursor_seek_fallback` set to `since`. + +This example demonstrates how to resume from the persisted cursor when +it exists, or otherwise begin reading logs from the last 24 hours. + +["source","yaml",subs="attributes"] +---- +seek: cursor +cursor_seek_fallback: since +since: -24h +---- [float] [id="{beatname_lc}-input-{type}-units"] diff --git a/filebeat/input/journald/config.go b/filebeat/input/journald/config.go index 60f5881341b8..5f18cbce211f 100644 --- a/filebeat/input/journald/config.go +++ b/filebeat/input/journald/config.go @@ -48,6 +48,10 @@ type config struct { // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` + // Since is the relative time offset from now to provide journal + // entries from. If Since is nil, no offset is applied. + Since *time.Duration `config:"since"` + // Seek is the method to read from journals. Seek journalread.SeekMode `config:"seek"` @@ -100,7 +104,11 @@ func (im *bwcIncludeMatches) Unpack(c *ucfg.Config) error { return c.Unpack((*journalfield.IncludeMatches)(im)) } -var errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback") +var ( + errInvalidSeekFallback = errors.New("invalid setting for cursor_seek_fallback") + errInvalidSeek = errors.New("invalid setting for seek") + errInvalidSeekSince = errors.New("incompatible setting for since and seek or cursor_seek_fallback") +) func defaultConfig() config { return config{ @@ -113,8 +121,26 @@ func defaultConfig() config { } func (c *config) Validate() error { - if c.CursorSeekFallback != journalread.SeekHead && c.CursorSeekFallback != journalread.SeekTail { + if c.Seek == journalread.SeekInvalid { + return errInvalidSeek + } + switch c.CursorSeekFallback { + case journalread.SeekHead, journalread.SeekTail, journalread.SeekSince: + default: return errInvalidSeekFallback } + if c.Since == nil { + switch { + case c.Seek == journalread.SeekSince, + c.Seek == journalread.SeekCursor && c.CursorSeekFallback == journalread.SeekSince: + return errInvalidSeekSince + default: + return nil + } + } + needSince := c.Seek == journalread.SeekSince || (c.Seek == journalread.SeekCursor && c.CursorSeekFallback == journalread.SeekSince) + if !needSince { + return errInvalidSeekSince + } return nil } diff --git a/filebeat/input/journald/config_test.go b/filebeat/input/journald/config_test.go index e5205d8161dd..cf6abe848b54 100644 --- a/filebeat/input/journald/config_test.go +++ b/filebeat/input/journald/config_test.go @@ -20,12 +20,17 @@ package journald import ( + "errors" + "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + jr "github.com/elastic/beats/v7/filebeat/input/journald/pkg/journalread" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" ) func TestConfigIncludeMatches(t *testing.T) { @@ -62,3 +67,96 @@ include_matches: verify(t, yaml) }) } + +func TestConfigValidate(t *testing.T) { + t.Run("table", func(t *testing.T) { + + nameOf := [...]string{ + jr.SeekInvalid: "invalid", + jr.SeekHead: "head", + jr.SeekTail: "tail", + jr.SeekCursor: "cursor", + jr.SeekSince: "since", + } + + modes := []jr.SeekMode{ + jr.SeekInvalid, + jr.SeekHead, + jr.SeekTail, + jr.SeekCursor, + jr.SeekSince, + } + const n = jr.SeekSince + 1 + + errSeek := errInvalidSeek + errFall := errInvalidSeekFallback + errSince := errInvalidSeekSince + // Want is the tables of expectations: seek in major, fallback in minor. + want := map[bool][n][n]error{ + false: { // No since option set. + jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek}, + jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + }, + true: { // Since option set. + jr.SeekInvalid: {jr.SeekInvalid: errSeek, jr.SeekHead: errSeek, jr.SeekTail: errSeek, jr.SeekCursor: errSeek, jr.SeekSince: errSeek}, + jr.SeekHead: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekTail: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: errSince}, + jr.SeekCursor: {jr.SeekInvalid: errFall, jr.SeekHead: errSince, jr.SeekTail: errSince, jr.SeekCursor: errFall, jr.SeekSince: nil}, + jr.SeekSince: {jr.SeekInvalid: errFall, jr.SeekHead: nil, jr.SeekTail: nil, jr.SeekCursor: errFall, jr.SeekSince: nil}, + }, + } + + for setSince := range want { + for _, seek := range modes { + for _, fallback := range modes { + name := fmt.Sprintf("seek_%s_fallback_%s_since_%t", nameOf[seek], nameOf[fallback], setSince) + t.Run(name, func(t *testing.T) { + cfg := config{Seek: seek, CursorSeekFallback: fallback} + if setSince { + cfg.Since = new(time.Duration) + } + got := cfg.Validate() + if !errors.Is(got, want[setSince][seek][fallback]) { + t.Errorf("unexpected error: got:%v want:%v", got, want[setSince][seek][fallback]) + } + }) + } + } + } + }) + + t.Run("use", func(t *testing.T) { + logger := logp.L() + for seek := jr.SeekInvalid; seek <= jr.SeekSince+1; seek++ { + for seekFallback := jr.SeekInvalid; seekFallback <= jr.SeekSince+1; seekFallback++ { + for _, since := range []*time.Duration{nil, new(time.Duration)} { + for _, pos := range []string{"", "defined"} { + // Construct a config with fields checked by Validate. + cfg := config{ + Since: since, + Seek: seek, + CursorSeekFallback: seekFallback, + } + if err := cfg.Validate(); err != nil { + continue + } + + // Confirm we never get to seek since mode with a nil since. + cp := checkpoint{Position: pos} + mode, _ := seekBy(logger, cp, cfg.Seek, cfg.CursorSeekFallback) + if mode == jr.SeekSince { + if cfg.Since == nil { + // If we reach here we would have panicked in Run. + t.Errorf("got nil since in valid seek since mode: seek=%d seek_fallback=%d since=%d cp=%+v", + seek, seekFallback, since, cp) + } + } + } + } + } + } + }) +} diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index e4d3d858dd65..c32d677ffa44 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -40,6 +40,7 @@ import ( type journald struct { Backoff time.Duration MaxBackoff time.Duration + Since *time.Duration Seek journalread.SeekMode CursorSeekFallback journalread.SeekMode Matches journalfield.IncludeMatches @@ -104,6 +105,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { return sources, &journald{ Backoff: config.Backoff, MaxBackoff: config.MaxBackoff, + Since: config.Since, Seek: config.Seek, CursorSeekFallback: config.CursorSeekFallback, Matches: journalfield.IncludeMatches(config.Matches), @@ -140,7 +142,13 @@ func (inp *journald) Run( } defer reader.Close() - if err := reader.Seek(seekBy(ctx.Logger, currentCheckpoint, inp.Seek, inp.CursorSeekFallback)); err != nil { + mode, pos := seekBy(ctx.Logger, currentCheckpoint, inp.Seek, inp.CursorSeekFallback) + if mode == journalread.SeekSince { + err = reader.SeekRealtimeUsec(uint64(time.Now().Add(*inp.Since).UnixMicro())) + } else { + err = reader.Seek(mode, pos) + } + if err != nil { log.Error("Continue from current position. Seek failed with: %v", err) } @@ -168,7 +176,10 @@ func (inp *journald) Run( func (inp *journald) open(log *logp.Logger, canceler input.Canceler, src cursor.Source) (*journalread.Reader, error) { backoff := backoff.NewExpBackoff(canceler.Done(), inp.Backoff, inp.MaxBackoff) reader, err := journalread.Open(log, src.Name(), backoff, - withFilters(inp.Matches), withUnits(inp.Units), withTransports(inp.Transports), withSyslogIdentifiers(inp.Identifiers)) + withFilters(inp.Matches), + withUnits(inp.Units), + withTransports(inp.Transports), + withSyslogIdentifiers(inp.Identifiers)) if err != nil { return nil, sderr.Wrap(err, "failed to create reader for %{path} journal", src.Name()) } @@ -223,12 +234,14 @@ func withSyslogIdentifiers(identifiers []string) func(*sdjournal.Journal) error // seekBy tries to find the last known position in the journal, so we can continue collecting // from the last known position. // The checkpoint is ignored if the user has configured the input to always -// seek to the head/tail of the journal on startup. -func seekBy(log *logp.Logger, cp checkpoint, seek, defaultSeek journalread.SeekMode) (journalread.SeekMode, string) { - mode := seek +// seek to the head/tail/since of the journal on startup. +func seekBy(log *logp.Logger, cp checkpoint, seek, defaultSeek journalread.SeekMode) (mode journalread.SeekMode, pos string) { + mode = seek if mode == journalread.SeekCursor && cp.Position == "" { mode = defaultSeek - if mode != journalread.SeekHead && mode != journalread.SeekTail { + switch mode { + case journalread.SeekHead, journalread.SeekTail, journalread.SeekSince: + default: log.Error("Invalid option for cursor_seek_fallback") mode = journalread.SeekHead } diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 7724c6285fac..a985f1f52583 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -23,6 +23,7 @@ import ( "context" "path" "testing" + "time" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -219,6 +220,20 @@ func TestInputIncludeMatches(t *testing.T) { // TestInputSeek test the output of various seek modes while reading // from input-multiline-parser.journal. func TestInputSeek(t *testing.T) { + // timeOfFirstEvent is the @timestamp on the "pam_unix" message. + var timeOfFirstEvent = time.Date(2021, time.November, 22, 17, 10, 4, 51729000, time.UTC) + + var allMessages = []string{ + "pam_unix(sudo:session): session closed for user root", + "Started Outputs some log lines.", + "1st line", + "2nd line", + "3rd line", + "4th line", + "5th line", + "6th line", + } + tests := map[string]struct { config mapstr.M expectedMessages []string @@ -227,16 +242,7 @@ func TestInputSeek(t *testing.T) { config: map[string]any{ "seek": "head", }, - expectedMessages: []string{ - "pam_unix(sudo:session): session closed for user root", - "Started Outputs some log lines.", - "1st line", - "2nd line", - "3rd line", - "4th line", - "5th line", - "6th line", - }, + expectedMessages: allMessages, }, "seek tail": { config: map[string]any{ @@ -248,24 +254,34 @@ func TestInputSeek(t *testing.T) { config: map[string]any{ "seek": "cursor", }, - expectedMessages: []string{ - "pam_unix(sudo:session): session closed for user root", - "Started Outputs some log lines.", - "1st line", - "2nd line", - "3rd line", - "4th line", - "5th line", - "6th line", - }, + expectedMessages: allMessages, }, - "seek cursor fallback": { + "seek cursor with tail fallback": { config: map[string]any{ "seek": "cursor", "cursor_seek_fallback": "tail", }, expectedMessages: nil, // No messages are expected because it will fall back to seek=tail. }, + "seek since": { + config: map[string]any{ + "seek": "since", + // Query using one microsecond after the first event so that the first event + // is not returned. Note that journald uses microsecond precision for times. + "since": -1 * time.Since(timeOfFirstEvent.Add(time.Microsecond)), + }, + expectedMessages: allMessages[1:], + }, + "seek cursor with since fallback": { + config: map[string]any{ + "seek": "cursor", + "cursor_seek_fallback": "since", + // Query using one microsecond after the first event so that the first event + // is not returned. Note that journald uses microsecond precision for times. + "since": -1 * time.Since(timeOfFirstEvent.Add(time.Microsecond)), + }, + expectedMessages: allMessages[1:], + }, } for name, testCase := range tests { diff --git a/filebeat/input/journald/pkg/journalread/mode.go b/filebeat/input/journald/pkg/journalread/mode.go index 36132ffe11d6..3c6fa923361a 100644 --- a/filebeat/input/journald/pkg/journalread/mode.go +++ b/filebeat/input/journald/pkg/journalread/mode.go @@ -31,12 +31,15 @@ const ( SeekTail // SeekCursor option seeks to the position specified in the cursor SeekCursor + // SeekSince option seeks to the position specified by the since option + SeekSince ) var seekModes = map[string]SeekMode{ "head": SeekHead, "tail": SeekTail, "cursor": SeekCursor, + "since": SeekSince, } // Unpack validates and unpack "seek" config options. It returns an error if diff --git a/filebeat/input/journald/pkg/journalread/mode_test.go b/filebeat/input/journald/pkg/journalread/mode_test.go index aef0ed4150cc..7b323a06be58 100644 --- a/filebeat/input/journald/pkg/journalread/mode_test.go +++ b/filebeat/input/journald/pkg/journalread/mode_test.go @@ -27,6 +27,7 @@ func TestMode_Unpack(t *testing.T) { "head": SeekHead, "tail": SeekTail, "cursor": SeekCursor, + "since": SeekSince, } for str, want := range tests { diff --git a/filebeat/input/journald/pkg/journalread/reader.go b/filebeat/input/journald/pkg/journalread/reader.go index 571de214b557..9994c0aad7c1 100644 --- a/filebeat/input/journald/pkg/journalread/reader.go +++ b/filebeat/input/journald/pkg/journalread/reader.go @@ -54,6 +54,7 @@ type journal interface { GetEntry() (*sdjournal.JournalEntry, error) SeekHead() error SeekTail() error + SeekRealtimeUsec(usec uint64) error SeekCursor(string) error } @@ -146,6 +147,13 @@ func (r *Reader) Seek(mode SeekMode, cursor string) (err error) { return err } +// SeekRealtimeUsec moves the read pointer to the entry with the +// specified CLOCK_REALTIME timestamp. This corresponds to +// sd_journal_seek_realtime_usec. +func (r *Reader) SeekRealtimeUsec(usec uint64) error { + return r.journal.SeekRealtimeUsec(usec) +} + // Next reads a new journald entry from the journal. It blocks if there is // currently no entry available in the journal, or until an error has occurred. func (r *Reader) Next(cancel canceler) (*sdjournal.JournalEntry, error) {