From 181db6bf1f156d2bef01f86050a910b9ae824360 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Wed, 28 Nov 2018 18:41:24 +0100 Subject: [PATCH] Add cursor_seek_fallback option (#9234) New option is introduced to configure the fallback method of seek mode `"cursor"`. The option is named `cursor_seek_fallback`. The name is taken from the community Journalbeat: https://github.com/mheese/journalbeat/blob/master/config/journalbeat.yml#L3 By default it seeks to the beginning as before. But from now on it is possible to configure it to start reading from the end of the journal. --- _meta/beat.yml | 2 ++ input/config.go | 9 ++++--- input/input.go | 22 +++++++++-------- journalbeat.reference.yml | 2 ++ journalbeat.yml | 2 ++ reader/config.go | 2 ++ reader/journal.go | 12 ++++++++-- tests/system/config/journalbeat.yml.j2 | 5 +++- tests/system/test_base.py | 33 ++++++++++++++++++++++++-- 9 files changed, 71 insertions(+), 18 deletions(-) diff --git a/_meta/beat.yml b/_meta/beat.yml index 78c28ffe36d9..0b7b4fd64ab9 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -25,6 +25,8 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor + # Fallback position if no cursor data is available. + #cursor_seek_fallback: head # Exact matching for field values of events. # Matching for nginx entries: "systemd.unit=nginx" diff --git a/input/config.go b/input/config.go index 5bdbfcd2ec95..9c152daf272b 100644 --- a/input/config.go +++ b/input/config.go @@ -36,6 +36,8 @@ type Config struct { MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` // Seek is the method to read from journals. Seek config.SeekMode `config:"seek"` + // CursorSeekFallback sets where to seek if registry file is not available. + CursorSeekFallback config.SeekMode `config:"cursor_seek_fallback"` // Matches store the key value pairs to match entries. Matches []string `config:"include_matches"` @@ -48,8 +50,9 @@ type Config struct { var ( // DefaultConfig is the defaults for an inputs DefaultConfig = Config{ - Backoff: 1 * time.Second, - MaxBackoff: 20 * time.Second, - Seek: config.SeekCursor, + Backoff: 1 * time.Second, + MaxBackoff: 20 * time.Second, + Seek: config.SeekCursor, + CursorSeekFallback: config.SeekHead, } ) diff --git a/input/input.go b/input/input.go index 094d169a4ca8..b59d475316a6 100644 --- a/input/input.go +++ b/input/input.go @@ -67,11 +67,12 @@ func New( var readers []*reader.Reader if len(config.Paths) == 0 { cfg := reader.Config{ - Path: reader.LocalSystemJournalID, // used to identify the state in the registry - Backoff: config.Backoff, - MaxBackoff: config.MaxBackoff, - Seek: config.Seek, - Matches: config.Matches, + Path: reader.LocalSystemJournalID, // used to identify the state in the registry + Backoff: config.Backoff, + MaxBackoff: config.MaxBackoff, + Seek: config.Seek, + CursorSeekFallback: config.CursorSeekFallback, + Matches: config.Matches, } state := states[reader.LocalSystemJournalID] @@ -84,11 +85,12 @@ func New( for _, p := range config.Paths { cfg := reader.Config{ - Path: p, - Backoff: config.Backoff, - MaxBackoff: config.MaxBackoff, - Seek: config.Seek, - Matches: config.Matches, + Path: p, + Backoff: config.Backoff, + MaxBackoff: config.MaxBackoff, + Seek: config.Seek, + CursorSeekFallback: config.CursorSeekFallback, + Matches: config.Matches, } state := states[p] r, err := reader.New(cfg, done, state, logger) diff --git a/journalbeat.reference.yml b/journalbeat.reference.yml index 59b6d2d813c8..b5b03a133dc8 100644 --- a/journalbeat.reference.yml +++ b/journalbeat.reference.yml @@ -25,6 +25,8 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor + # Fallback position if no cursor data is available. + #cursor_seek_fallback: head # Exact matching for field values of events. # Matching for nginx entries: "systemd.unit=nginx" diff --git a/journalbeat.yml b/journalbeat.yml index 269d519aa3c6..414e249382aa 100644 --- a/journalbeat.yml +++ b/journalbeat.yml @@ -25,6 +25,8 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor + # Fallback position if no cursor data is available. + #cursor_seek_fallback: head # Exact matching for field values of events. # Matching for nginx entries: "systemd.unit=nginx" diff --git a/reader/config.go b/reader/config.go index 7d52ff7422df..c0fe243894f7 100644 --- a/reader/config.go +++ b/reader/config.go @@ -30,6 +30,8 @@ type Config struct { // Seek specifies the seeking stategy. // Possible values: head, tail, cursor. Seek config.SeekMode + // CursorSeekFallback sets where to seek if registry file is not available. + CursorSeekFallback config.SeekMode // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration // Backoff is the current interval to wait before diff --git a/reader/journal.go b/reader/journal.go index 8df68170fcda..86082980903f 100644 --- a/reader/journal.go +++ b/reader/journal.go @@ -146,8 +146,16 @@ func (r *Reader) seek(cursor string) { switch r.config.Seek { case config.SeekCursor: if cursor == "" { - r.journal.SeekHead() - r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning") + switch r.config.CursorSeekFallback { + case config.SeekHead: + r.journal.SeekHead() + r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning") + case config.SeekTail: + r.journal.SeekTail() + r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the end") + default: + r.logger.Error("Invalid option for cursor_seek_fallback") + } return } r.journal.SeekCursor(cursor) diff --git a/tests/system/config/journalbeat.yml.j2 b/tests/system/config/journalbeat.yml.j2 index ca8cc1d862cf..68861a6c5ce2 100644 --- a/tests/system/config/journalbeat.yml.j2 +++ b/tests/system/config/journalbeat.yml.j2 @@ -2,7 +2,10 @@ journalbeat.inputs: - paths: [{{ journal_path }}] seek: {{ seek_method }} - matches: [{{ matches }}] + {% if cursor_seek_fallback %} + cursor_seek_fallback: {{ cursor_seek_fallback }} + {% endif %} + include_matches: [{{ matches }}] journalbeat.registry: {{ registry_file }} diff --git a/tests/system/test_base.py b/tests/system/test_base.py index be68e6c08c3b..84688f3e40c5 100644 --- a/tests/system/test_base.py +++ b/tests/system/test_base.py @@ -19,8 +19,7 @@ def test_start_with_local_journal(self): ) journalbeat_proc = self.start_beat() - self.wait_until(lambda: self.log_contains( - "journalbeat is running"), max_timeout=10) + self.wait_until(lambda: self.log_contains("journalbeat is running")) exit_code = journalbeat_proc.kill_and_wait() assert exit_code == 0 @@ -33,6 +32,7 @@ def test_start_with_journal_directory(self): self.render_config_template( journal_path=self.beat_path + "/tests/system/input/", + seek_method="tail", path=os.path.abspath(self.working_dir) + "/log/*" ) journalbeat_proc = self.start_beat() @@ -78,6 +78,35 @@ def test_start_with_selected_journal_file(self): exit_code = journalbeat_proc.kill_and_wait() assert exit_code == 0 + @unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux") + def test_start_with_selected_journal_file_with_cursor_fallback(self): + """ + Journalbeat is able to open a journal file and start to read it from the position configured by seek and cursor_seek_fallback. + """ + + self.render_config_template( + journal_path=self.beat_path + "/tests/system/input/test.journal", + seek_method="cursor", + cursor_seek_fallback="tail", + path=os.path.abspath(self.working_dir) + "/log/*" + ) + journalbeat_proc = self.start_beat() + + required_log_snippets = [ + # journalbeat can be started + "journalbeat is running", + # journalbeat can seek to the position defined in cursor_seek_fallback. + "Seeking method set to cursor, but no state is saved for reader. Starting to read from the end", + # message can be read from test journal + "\"message\": \"thinkpad_acpi: please report the conditions when this event happened to ibm-acpi-devel@lists.sourceforge.net\"", + ] + for snippet in required_log_snippets: + self.wait_until(lambda: self.log_contains(snippet), + name="Line in '{}' Journalbeat log".format(snippet)) + + exit_code = journalbeat_proc.kill_and_wait() + assert exit_code == 0 + @unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux") def test_read_events_with_existing_registry(self): """