Skip to content

Commit

Permalink
Add cursor_seek_fallback option (elastic#9234)
Browse files Browse the repository at this point in the history
New option is introduced to configure the fallback method of seek mode `"cursor"`. The option is named `cursor_seek_fallback`. The name is taken from the community Journalbeat: https://github.com/mheese/journalbeat/blob/master/config/journalbeat.yml#L3

By default it seeks to the beginning as before. But from now on it is possible to configure it to start reading from the end of the journal.
  • Loading branch information
kvch authored Nov 28, 2018
1 parent 8acf9e3 commit 181db6b
Show file tree
Hide file tree
Showing 9 changed files with 71 additions and 18 deletions.
2 changes: 2 additions & 0 deletions _meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ journalbeat.inputs:

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
# Fallback position if no cursor data is available.
#cursor_seek_fallback: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
Expand Down
9 changes: 6 additions & 3 deletions input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type Config struct {
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek config.SeekMode `config:"seek"`
// CursorSeekFallback sets where to seek if registry file is not available.
CursorSeekFallback config.SeekMode `config:"cursor_seek_fallback"`
// Matches store the key value pairs to match entries.
Matches []string `config:"include_matches"`

Expand All @@ -48,8 +50,9 @@ type Config struct {
var (
// DefaultConfig is the defaults for an inputs
DefaultConfig = Config{
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
CursorSeekFallback: config.SeekHead,
}
)
22 changes: 12 additions & 10 deletions input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,12 @@ func New(
var readers []*reader.Reader
if len(config.Paths) == 0 {
cfg := reader.Config{
Path: reader.LocalSystemJournalID, // used to identify the state in the registry
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Seek: config.Seek,
Matches: config.Matches,
Path: reader.LocalSystemJournalID, // used to identify the state in the registry
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Seek: config.Seek,
CursorSeekFallback: config.CursorSeekFallback,
Matches: config.Matches,
}

state := states[reader.LocalSystemJournalID]
Expand All @@ -84,11 +85,12 @@ func New(

for _, p := range config.Paths {
cfg := reader.Config{
Path: p,
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Seek: config.Seek,
Matches: config.Matches,
Path: p,
Backoff: config.Backoff,
MaxBackoff: config.MaxBackoff,
Seek: config.Seek,
CursorSeekFallback: config.CursorSeekFallback,
Matches: config.Matches,
}
state := states[p]
r, err := reader.New(cfg, done, state, logger)
Expand Down
2 changes: 2 additions & 0 deletions journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ journalbeat.inputs:

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
# Fallback position if no cursor data is available.
#cursor_seek_fallback: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
Expand Down
2 changes: 2 additions & 0 deletions journalbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ journalbeat.inputs:

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
# Fallback position if no cursor data is available.
#cursor_seek_fallback: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
Expand Down
2 changes: 2 additions & 0 deletions reader/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ type Config struct {
// Seek specifies the seeking stategy.
// Possible values: head, tail, cursor.
Seek config.SeekMode
// CursorSeekFallback sets where to seek if registry file is not available.
CursorSeekFallback config.SeekMode
// MaxBackoff is the limit of the backoff time.
MaxBackoff time.Duration
// Backoff is the current interval to wait before
Expand Down
12 changes: 10 additions & 2 deletions reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,16 @@ func (r *Reader) seek(cursor string) {
switch r.config.Seek {
case config.SeekCursor:
if cursor == "" {
r.journal.SeekHead()
r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning")
switch r.config.CursorSeekFallback {
case config.SeekHead:
r.journal.SeekHead()
r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning")
case config.SeekTail:
r.journal.SeekTail()
r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the end")
default:
r.logger.Error("Invalid option for cursor_seek_fallback")
}
return
}
r.journal.SeekCursor(cursor)
Expand Down
5 changes: 4 additions & 1 deletion tests/system/config/journalbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
journalbeat.inputs:
- paths: [{{ journal_path }}]
seek: {{ seek_method }}
matches: [{{ matches }}]
{% if cursor_seek_fallback %}
cursor_seek_fallback: {{ cursor_seek_fallback }}
{% endif %}
include_matches: [{{ matches }}]

journalbeat.registry: {{ registry_file }}

Expand Down
33 changes: 31 additions & 2 deletions tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ def test_start_with_local_journal(self):
)
journalbeat_proc = self.start_beat()

self.wait_until(lambda: self.log_contains(
"journalbeat is running"), max_timeout=10)
self.wait_until(lambda: self.log_contains("journalbeat is running"))

exit_code = journalbeat_proc.kill_and_wait()
assert exit_code == 0
Expand All @@ -33,6 +32,7 @@ def test_start_with_journal_directory(self):

self.render_config_template(
journal_path=self.beat_path + "/tests/system/input/",
seek_method="tail",
path=os.path.abspath(self.working_dir) + "/log/*"
)
journalbeat_proc = self.start_beat()
Expand Down Expand Up @@ -78,6 +78,35 @@ def test_start_with_selected_journal_file(self):
exit_code = journalbeat_proc.kill_and_wait()
assert exit_code == 0

@unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux")
def test_start_with_selected_journal_file_with_cursor_fallback(self):
"""
Journalbeat is able to open a journal file and start to read it from the position configured by seek and cursor_seek_fallback.
"""

self.render_config_template(
journal_path=self.beat_path + "/tests/system/input/test.journal",
seek_method="cursor",
cursor_seek_fallback="tail",
path=os.path.abspath(self.working_dir) + "/log/*"
)
journalbeat_proc = self.start_beat()

required_log_snippets = [
# journalbeat can be started
"journalbeat is running",
# journalbeat can seek to the position defined in cursor_seek_fallback.
"Seeking method set to cursor, but no state is saved for reader. Starting to read from the end",
# message can be read from test journal
"\"message\": \"thinkpad_acpi: please report the conditions when this event happened to [email protected]\"",
]
for snippet in required_log_snippets:
self.wait_until(lambda: self.log_contains(snippet),
name="Line in '{}' Journalbeat log".format(snippet))

exit_code = journalbeat_proc.kill_and_wait()
assert exit_code == 0

@unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux")
def test_read_events_with_existing_registry(self):
"""
Expand Down

0 comments on commit 181db6b

Please sign in to comment.