diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 5ca4a9b9832..be868ddabd0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -321,6 +321,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Journalbeat* +- Added an `id` config option to inputs to allow running multiple inputs on the + same journal. {pull}18467{18467} + *Metricbeat* - Move the windows pdh implementation from perfmon to a shared location in order for future modules/metricsets to make use of. {pull}15503[15503] diff --git a/Vagrantfile b/Vagrantfile index fdff82a57ea..81560b73434 100644 --- a/Vagrantfile +++ b/Vagrantfile @@ -135,7 +135,7 @@ def linuxDebianProvision() #!/usr/bin/env bash set -eio pipefail apt-get update -apt-get install -y make gcc python3 python3-pip python3-venv git +apt-get install -y make gcc python3 python3-pip python3-venv git libsystemd-dev SCRIPT end @@ -229,6 +229,11 @@ Vagrant.configure(2) do |config| c.vm.box = "ubuntu/bionic64" c.vm.network :forwarded_port, guest: 22, host: 2228, id: "ssh", auto_correct: true + c.vm.provider :virtualbox do |vbox| + vbox.memory = 4096 + vbox.cpus = 4 + end + c.vm.provision "shell", inline: $unixProvision, privileged: false c.vm.provision "shell", inline: linuxGvmProvision, privileged: false c.vm.provision "shell", inline: linuxDebianProvision diff --git a/journalbeat/_meta/config/beat.reference.yml.tmpl b/journalbeat/_meta/config/beat.reference.yml.tmpl index 3d4bc90b6f1..8da73b6bf07 100644 --- a/journalbeat/_meta/config/beat.reference.yml.tmpl +++ b/journalbeat/_meta/config/beat.reference.yml.tmpl @@ -18,6 +18,11 @@ journalbeat.inputs: # When empty starts to read from local journal. - paths: [] + # An optional unique identifier for the input. By providing a unique `id` you + # can operate multiple inputs on the same journal. This allows each input's + # cursor to be persisted independently in the registry file. + #id: "" + # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. diff --git a/journalbeat/_meta/config/beat.yml.tmpl b/journalbeat/_meta/config/beat.yml.tmpl index 9410e82a925..a4f7d5d4c31 100644 --- a/journalbeat/_meta/config/beat.yml.tmpl +++ b/journalbeat/_meta/config/beat.yml.tmpl @@ -18,6 +18,11 @@ journalbeat.inputs: # When empty starts to read from local journal. - paths: [] + # An optional unique identifier for the input. By providing a unique `id` you + # can operate multiple inputs on the same journal. This allows each input's + # cursor to be persisted independently in the registry file. + #id: "" + # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. diff --git a/journalbeat/docs/config-options.asciidoc b/journalbeat/docs/config-options.asciidoc index 816207c1ad2..106318ceb6d 100644 --- a/journalbeat/docs/config-options.asciidoc +++ b/journalbeat/docs/config-options.asciidoc @@ -75,6 +75,27 @@ in a Docker container. However, in this example the fields are matched using the You can specify the following options to configure how {beatname_uc} reads the journal files. +[float] +[id="{beatname_lc}-id"] +==== `id` + +An optional unique identifier for the input. By providing a unique `id` you can +operate multiple inputs on the same journal. This allows each input's cursor +to be persisted independently in the registry file. + +---- +{beatname_lc}.inputs: +- id: consul.service + paths: [] + include_matches: + - _SYSTEMD_UNIT=consul.service + +- id: vault.service + paths: [] + include_matches: + - _SYSTEMD_UNIT=vault.service +---- + [float] [id="{beatname_lc}-paths"] ==== `paths` @@ -108,10 +129,10 @@ The position to start reading the journal from. Valid settings are: * `head`: Starts reading at the beginning of the journal. After a restart, {beatname_uc} resends all log messages in the journal. -* `tail`: Starts reading at the end of the journal. After a restart, +* `tail`: Starts reading at the end of the journal. After a restart, {beatname_uc} resends the last message, which might result in duplicates. If multiple log messages are written to a journal while {beatname_uc} is down, -only the last log message is sent on restart. +only the last log message is sent on restart. * `cursor`: On first read, starts reading at the beginning of the journal. After a reload or restart, continues reading at the last known position. @@ -207,7 +228,7 @@ journald fields: The following translated fields for https://docs.docker.com/config/containers/logging/journald/[Docker] are also -available: +available: [horizontal] `CONTAINER_ID`:: `container.id_truncated` diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 00eb871cb4c..63c31ccfce7 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -28,6 +28,8 @@ import ( // Config stores the options of an input. type Config struct { + // Unique ID of the input for state persistence purposes. + ID string `config:"id"` // Paths stores the paths to the journal files to be read. Paths []string `config:"paths"` // Backoff is the current interval to wait before diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index ca39b082049..368c2f52c13 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -25,8 +25,6 @@ import ( "github.com/elastic/beats/v7/libbeat/common/fmtstr" - "github.com/gofrs/uuid" - "github.com/elastic/beats/v7/journalbeat/checkpoint" "github.com/elastic/beats/v7/journalbeat/reader" "github.com/elastic/beats/v7/libbeat/beat" @@ -43,7 +41,6 @@ type Input struct { pipeline beat.Pipeline client beat.Client states map[string]checkpoint.JournalState - id uuid.UUID logger *logp.Logger eventMeta common.EventMetadata processors beat.ProcessorList @@ -61,13 +58,11 @@ func New( return nil, err } - id, err := uuid.NewV4() - if err != nil { - return nil, fmt.Errorf("error while generating ID for input: %v", err) + logger := logp.NewLogger("input") + if config.ID != "" { + logger = logger.With("id", config.ID) } - logger := logp.NewLogger("input").With("id", id) - var readers []*reader.Reader if len(config.Paths) == 0 { cfg := reader.Config{ @@ -78,9 +73,10 @@ func New( CursorSeekFallback: config.CursorSeekFallback, Matches: config.Matches, SaveRemoteHostname: config.SaveRemoteHostname, + CheckpointID: checkpointID(config.ID, reader.LocalSystemJournalID), } - state := states[reader.LocalSystemJournalID] + state := states[cfg.CheckpointID] r, err := reader.NewLocal(cfg, done, state, logger) if err != nil { return nil, fmt.Errorf("error creating reader for local journal: %v", err) @@ -97,8 +93,10 @@ func New( CursorSeekFallback: config.CursorSeekFallback, Matches: config.Matches, SaveRemoteHostname: config.SaveRemoteHostname, + CheckpointID: checkpointID(config.ID, p), } - state := states[p] + + state := states[cfg.CheckpointID] r, err := reader.New(cfg, done, state, logger) if err != nil { return nil, fmt.Errorf("error creating reader for journal: %v", err) @@ -119,7 +117,6 @@ func New( config: config, pipeline: b.Publisher, states: states, - id: id, logger: logger, eventMeta: config.EventMetadata, processors: inputProcessors, @@ -138,7 +135,7 @@ func (i *Input) Run() { Processor: i.processors, }, ACKCount: func(n int) { - i.logger.Infof("journalbeat successfully published %d events", n) + i.logger.Debugw("journalbeat successfully published events", "event.count", n) }, }) if err != nil { @@ -233,3 +230,12 @@ func processorsForInput(beatInfo beat.Info, config Config) (*processors.Processo return procs, nil } + +// checkpointID returns the identifier used to track persistent state for the +// input. +func checkpointID(id, path string) string { + if id == "" { + return path + } + return "journald::" + path + "::" + id +} diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 7a9b0a62e2a..6824dfe28cf 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -18,6 +18,11 @@ journalbeat.inputs: # When empty starts to read from local journal. - paths: [] + # An optional unique identifier for the input. By providing a unique `id` you + # can operate multiple inputs on the same journal. This allows each input's + # cursor to be persisted independently in the registry file. + #id: "" + # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index 2dfa8a07f5f..e3d72984701 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -18,6 +18,11 @@ journalbeat.inputs: # When empty starts to read from local journal. - paths: [] + # An optional unique identifier for the input. By providing a unique `id` you + # can operate multiple inputs on the same journal. This allows each input's + # cursor to be persisted independently in the registry file. + #id: "" + # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. diff --git a/journalbeat/reader/config.go b/journalbeat/reader/config.go index d9ad5769688..f8dbabe5fee 100644 --- a/journalbeat/reader/config.go +++ b/journalbeat/reader/config.go @@ -41,6 +41,8 @@ type Config struct { Matches []string // SaveRemoteHostname defines if the original source of the entry needs to be saved. SaveRemoteHostname bool + // CheckpointID is the identifier to use when persisting state. + CheckpointID string } const ( diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 9c599d29657..f364afe6d6b 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -267,7 +267,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { } state := checkpoint.JournalState{ - Path: r.config.Path, + Path: r.config.CheckpointID, Cursor: entry.Cursor, RealtimeTimestamp: entry.RealtimeTimestamp, MonotonicTimestamp: entry.MonotonicTimestamp, diff --git a/journalbeat/tests/system/config/journalbeat.yml.j2 b/journalbeat/tests/system/config/journalbeat.yml.j2 index 7486f9914ef..858181ca409 100644 --- a/journalbeat/tests/system/config/journalbeat.yml.j2 +++ b/journalbeat/tests/system/config/journalbeat.yml.j2 @@ -1,85 +1,13 @@ -################### Beat Configuration ######################### journalbeat.inputs: -- paths: [{{ journal_path }}] - seek: {{ seek_method }} - {% if cursor_seek_fallback %} - cursor_seek_fallback: {{ cursor_seek_fallback }} - {% endif %} - include_matches: [{{ matches }}] +{% for input in inputs %} +- {{ input | tojson }} +{% endfor %} +{% if registry_file is defined %} journalbeat.registry_file: {{ registry_file }} +{% endif %} -############################# Output ########################################## - -# Configure what outputs to use when sending the data collected by the beat. -# You can enable one or multiple outputs by setting enabled option to true. output: - - ### File as output file: - # Enabling file output - enabled: true - - # Path to the directory where to save the generated files. The option is mandatory. path: {{ output_file_path|default(beat.working_dir + "/output") }} - - - # Name of the generated files. The default is `journalbeat` and it generates - # files: `journalbeat`, `journalbeat.1`, `journalbeat.2`, etc. filename: {{ output_file_filename|default("journalbeat") }} - - # Maximum size in kilobytes of each file. When this size is reached, the files are - # rotated. The default value is 10 MB. - #rotate_every_kb: 10000 - - # Maximum number of files under path. When this number of files is reached, the - # oldest file is deleted and the rest are shifted from last to first. The default - # is 7 files. - #number_of_files: 7 - - - -############################# Beat ######################################### - -# The name of the shipper that publishes the network data. It can be used to group -# all the transactions sent by a single shipper in the web interface. -# If this options is not defined, the hostname is used. -#name: - -# The tags of the shipper are included in their own field with each -# transaction published. Tags make it easy to group servers by different -# logical properties. -#tags: ["service-X", "web-tier"] - - - -############################# Logging ######################################### - -#logging: - # Send all logging output to syslog. On Windows default is false, otherwise - # default is true. - #to_syslog: true - - # Write all logging output to files. Beats automatically rotate files if configurable - # limit is reached. - #to_files: false - - # Enable debug output for selected components. - #selectors: [] - - # Set log level - #level: error - - #files: - # The directory where the log files will written to. - #path: /var/log/journalbeat - - # The name of the files where the logs are written to. - #name: journalbeat - - # Configure log file size limit. If limit is reached, log file will be - # automatically rotated - #rotateeverybytes: 10485760 # = 10MB - - # Number of rotated log files to keep. Oldest files will be deleted first. - #keepfiles: 7 diff --git a/journalbeat/tests/system/test_base.py b/journalbeat/tests/system/test_base.py index c85f1ef9f16..a94d4a7473c 100644 --- a/journalbeat/tests/system/test_base.py +++ b/journalbeat/tests/system/test_base.py @@ -5,6 +5,7 @@ import unittest import time import yaml +from shutil import copyfile class Test(BaseTest): @@ -16,7 +17,9 @@ def test_start_with_local_journal(self): """ self.render_config_template( - path=os.path.abspath(self.working_dir) + "/log/*" + inputs=[{ + "paths": [], + }], ) journalbeat_proc = self.start_beat() @@ -32,9 +35,12 @@ def test_start_with_journal_directory(self): """ self.render_config_template( - journal_path=self.beat_path + "/tests/system/input/", - seek_method="tail", - path=os.path.abspath(self.working_dir) + "/log/*" + inputs=[{ + "paths": [ + self.beat_path + "/tests/system/input/", + ], + "seek": "tail", + }], ) journalbeat_proc = self.start_beat() @@ -58,9 +64,12 @@ def test_start_with_selected_journal_file(self): """ self.render_config_template( - journal_path=self.beat_path + "/tests/system/input/test.journal", - seek_method="head", - path=os.path.abspath(self.working_dir) + "/log/*" + inputs=[{ + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + "seek": "head", + }], ) journalbeat_proc = self.start_beat() @@ -86,10 +95,13 @@ def test_start_with_selected_journal_file_with_cursor_fallback(self): """ self.render_config_template( - journal_path=self.beat_path + "/tests/system/input/test.journal", - seek_method="cursor", - cursor_seek_fallback="tail", - path=os.path.abspath(self.working_dir) + "/log/*" + inputs=[{ + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + "seek": "cursor", + "cursor_seek_fallback": "tail", + }], ) journalbeat_proc = self.start_beat() @@ -114,15 +126,19 @@ def test_read_events_with_existing_registry(self): Journalbeat is able to follow reading a from a journal with an existing registry file. """ - registry_path = self.beat_path + "/tests/system/input/test.registry" + registry_path = os.path.join(os.path.abspath(self.working_dir), "data", "registry") + os.mkdir(os.path.dirname(registry_path)) + copyfile(self.beat_path + "/tests/system/input/test.registry", + os.path.join(os.path.abspath(self.working_dir), "data/registry")) input_path = self.beat_path + "/tests/system/input/test.journal" self._prepare_registry_file(registry_path, input_path) self.render_config_template( - journal_path=input_path, - seek_method="cursor", - registry_file=registry_path, - path=os.path.abspath(self.working_dir) + "/log/*", + inputs=[{ + "paths": [input_path], + "seek": "cursor", + "cursor_seek_fallback": "tail", + }], ) journalbeat_proc = self.start_beat() @@ -134,7 +150,7 @@ def test_read_events_with_existing_registry(self): # message can be read from test journal "please report the conditions when this event happened to", # only one event is read and published - "journalbeat successfully published 1 events", + 'journalbeat successfully published events\t{"event.count": 1}', ] for snippet in required_log_snippets: self.wait_until(lambda: self.log_contains(snippet), @@ -150,10 +166,15 @@ def test_read_events_with_include_matches(self): """ self.render_config_template( - journal_path=self.beat_path + "/tests/system/input/test.journal", - seek_method="head", - matches="syslog.priority=5", - path=os.path.abspath(self.working_dir) + "/log/*", + inputs=[{ + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + "seek": "head", + "include_matches": [ + "syslog.priority=5", + ] + }], ) journalbeat_proc = self.start_beat() @@ -167,7 +188,7 @@ def test_read_events_with_include_matches(self): "please report the conditions when this event happened to", "unhandled HKEY event 0x60b1", # Four events with priority 5 is publised - "journalbeat successfully published 4 events", + 'journalbeat successfully published events\t{"event.count": 4}', ] for snippet in required_log_snippets: self.wait_until(lambda: self.log_contains(snippet), @@ -176,6 +197,45 @@ def test_read_events_with_include_matches(self): exit_code = journalbeat_proc.kill_and_wait() assert exit_code == 0 + @unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux") + def test_input_id(self): + """ + Journalbeat persists states with IDs. + """ + + self.render_config_template( + inputs=[ + { + "id": "serviceA.unit", + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + }, + { + "id": "serviceB unit", + "paths": [ + self.beat_path + "/tests/system/input/test.journal", + ], + } + ], + ) + + # Run the beat until it publishes events from both inputs. + journalbeat_proc = self.start_beat() + expected_msg = 'successfully published events' + self.wait_until(lambda: self.log_contains(expected_msg)) + self.wait_until(lambda: self.log_contains(expected_msg)) + journalbeat_proc.check_kill_and_wait() + + # Verify that registry paths are prefixed with an ID. + registry_data = self.read_registry() + self.assertIn("journal_entries", registry_data) + journal_entries = registry_data['journal_entries'] + self.assertGreater(len(journal_entries), 0) + for item in journal_entries: + self.assertTrue(item['path'].startswith('journald::'), "starts with journald::") + self.assertTrue(item['path'].find('::service'), "ends with ::") + def _prepare_registry_file(self, registry_path, journal_path): lines = [] with open(registry_path, "r") as registry_file: @@ -186,6 +246,12 @@ def _prepare_registry_file(self, registry_path, journal_path): for line in lines: registry_file.write(line) + def read_registry(self): + registry_path = os.path.join(os.path.abspath(self.working_dir), "data", "registry") + + with open(registry_path, "r") as stream: + return yaml.safe_load(stream) + if __name__ == '__main__': unittest.main()