Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filebeat/input/journald: allow specifying since when to read journald entries #35408

Merged
merged 11 commits into from
Jul 27, 2023
10 changes: 10 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,16 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add XML decoding support to HTTPJSON. {issue}34438[34438] {pull}35235[35235]
- Add delegated account support when using Google ADC in `httpjson` input. {pull}35507[35507]
- Allow specifying since when to read journald entries. {pull}35408[35408]
- Add metrics for filestream input. {pull}35529[35529]
- Add support for collecting `httpjson` metrics. {pull}35392[35392]
- Add XML decoding support to CEL. {issue}34438[34438] {pull}35372[35372]
- Mark CEL input as GA. {pull}35559[35559]
- Add metrics for gcp-pubsub input. {pull}35614[35614]
- [GCS] Added scheduler debug logs and improved the context passing mechanism by removing them from struct params and passing them as function arguments. {pull}35674[35674]
- Allow non-AWS endpoints for awss3 input. {issue}35496[35496] {pull}35520[35520]
- Add Okta input package for entity analytics. {pull}35611[35611]
- Expose harvester metrics from filestream input {pull}35835[35835] {issue}33771[33771]
- Add device support for Azure AD entity analytics. {pull}35807[35807]
belimawr marked this conversation as resolved.
Show resolved Hide resolved

*Auditbeat*
- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]
Expand Down
10 changes: 10 additions & 0 deletions filebeat/docs/inputs/input-journald.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ A time offset from the current time to start reading from. To use
`since`, either the `seek` option must be set to `since`, or the `seek` mode
must be set to `cursor` and the `cursor_seek_fallback` set to `since`.

This example demonstrates how to resume from the persisted cursor when
it exists, or otherwise begin reading logs from the last 24 hours.

["source","yaml",subs="attributes"]
----
seek: cursor
cursor_seek_fallback: since
since: -24h
----

[float]
[id="{beatname_lc}-input-{type}-units"]
==== `units`
Expand Down
58 changes: 37 additions & 21 deletions filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"path"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -219,6 +220,20 @@ func TestInputIncludeMatches(t *testing.T) {
// TestInputSeek test the output of various seek modes while reading
// from input-multiline-parser.journal.
func TestInputSeek(t *testing.T) {
// timeOfFirstEvent is the @timestamp on the "pam_unix" message.
var timeOfFirstEvent = time.Date(2021, time.November, 22, 17, 10, 4, 51729000, time.UTC)

var allMessages = []string{
"pam_unix(sudo:session): session closed for user root",
"Started Outputs some log lines.",
"1st line",
"2nd line",
"3rd line",
"4th line",
"5th line",
"6th line",
}

tests := map[string]struct {
config mapstr.M
expectedMessages []string
Expand All @@ -227,16 +242,7 @@ func TestInputSeek(t *testing.T) {
config: map[string]any{
"seek": "head",
},
expectedMessages: []string{
"pam_unix(sudo:session): session closed for user root",
"Started Outputs some log lines.",
"1st line",
"2nd line",
"3rd line",
"4th line",
"5th line",
"6th line",
},
expectedMessages: allMessages,
},
"seek tail": {
config: map[string]any{
Expand All @@ -248,24 +254,34 @@ func TestInputSeek(t *testing.T) {
config: map[string]any{
"seek": "cursor",
},
expectedMessages: []string{
"pam_unix(sudo:session): session closed for user root",
"Started Outputs some log lines.",
"1st line",
"2nd line",
"3rd line",
"4th line",
"5th line",
"6th line",
},
expectedMessages: allMessages,
},
"seek cursor fallback": {
"seek cursor with tail fallback": {
config: map[string]any{
"seek": "cursor",
"cursor_seek_fallback": "tail",
},
expectedMessages: nil, // No messages are expected because it will fall back to seek=tail.
},
"seek since": {
config: map[string]any{
"seek": "since",
// Query using one microsecond after the first event so that the first event
// is not returned. Note that journald uses microsecond precision for times.
"since": -1 * time.Since(timeOfFirstEvent.Add(time.Microsecond)),
},
expectedMessages: allMessages[1:],
},
"seek cursor with since fallback": {
config: map[string]any{
"seek": "cursor",
"cursor_seek_fallback": "since",
// Query using one microsecond after the first event so that the first event
// is not returned. Note that journald uses microsecond precision for times.
"since": -1 * time.Since(timeOfFirstEvent.Add(time.Microsecond)),
},
expectedMessages: allMessages[1:],
},
}

for name, testCase := range tests {
Expand Down