Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Journalbeat] Add ID config option input #18467

Merged
merged 1 commit into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,9 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Journalbeat*

- Added an `id` config option to inputs to allow running multiple inputs on the
same journal. {pull}18467{18467}

*Metricbeat*

- Move the windows pdh implementation from perfmon to a shared location in order for future modules/metricsets to make use of. {pull}15503[15503]
Expand Down
7 changes: 6 additions & 1 deletion Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def linuxDebianProvision()
#!/usr/bin/env bash
set -eio pipefail
apt-get update
apt-get install -y make gcc python3 python3-pip python3-venv git
apt-get install -y make gcc python3 python3-pip python3-venv git libsystemd-dev
SCRIPT
end

Expand Down Expand Up @@ -229,6 +229,11 @@ Vagrant.configure(2) do |config|
c.vm.box = "ubuntu/bionic64"
c.vm.network :forwarded_port, guest: 22, host: 2228, id: "ssh", auto_correct: true

c.vm.provider :virtualbox do |vbox|
vbox.memory = 4096
vbox.cpus = 4
end

c.vm.provision "shell", inline: $unixProvision, privileged: false
c.vm.provision "shell", inline: linuxGvmProvision, privileged: false
c.vm.provision "shell", inline: linuxDebianProvision
Expand Down
5 changes: 5 additions & 0 deletions journalbeat/_meta/config/beat.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ journalbeat.inputs:
# When empty starts to read from local journal.
- paths: []

# An optional unique identifier for the input. By providing a unique `id` you
# can operate multiple inputs on the same journal. This allows each input's
# cursor to be persisted independently in the registry file.
#id: ""

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
Expand Down
5 changes: 5 additions & 0 deletions journalbeat/_meta/config/beat.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ journalbeat.inputs:
# When empty starts to read from local journal.
- paths: []

# An optional unique identifier for the input. By providing a unique `id` you
# can operate multiple inputs on the same journal. This allows each input's
# cursor to be persisted independently in the registry file.
#id: ""

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
Expand Down
27 changes: 24 additions & 3 deletions journalbeat/docs/config-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@ in a Docker container. However, in this example the fields are matched using the
You can specify the following options to configure how {beatname_uc} reads the
journal files.

[float]
[id="{beatname_lc}-id"]
==== `id`

An optional unique identifier for the input. By providing a unique `id` you can
operate multiple inputs on the same journal. This allows each input's cursor
to be persisted independently in the registry file.

----
{beatname_lc}.inputs:
- id: consul.service
paths: []
include_matches:
- _SYSTEMD_UNIT=consul.service

- id: vault.service
paths: []
include_matches:
- _SYSTEMD_UNIT=vault.service
----

[float]
[id="{beatname_lc}-paths"]
==== `paths`
Expand Down Expand Up @@ -108,10 +129,10 @@ The position to start reading the journal from. Valid settings are:

* `head`: Starts reading at the beginning of the journal. After a restart,
{beatname_uc} resends all log messages in the journal.
* `tail`: Starts reading at the end of the journal. After a restart,
* `tail`: Starts reading at the end of the journal. After a restart,
{beatname_uc} resends the last message, which might result in duplicates. If
multiple log messages are written to a journal while {beatname_uc} is down,
only the last log message is sent on restart.
only the last log message is sent on restart.
* `cursor`: On first read, starts reading at the beginning of the journal. After a
reload or restart, continues reading at the last known position.

Expand Down Expand Up @@ -207,7 +228,7 @@ journald fields:

The following translated fields for
https://docs.docker.com/config/containers/logging/journald/[Docker] are also
available:
available:

[horizontal]
`CONTAINER_ID`:: `container.id_truncated`
Expand Down
2 changes: 2 additions & 0 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

// Config stores the options of an input.
type Config struct {
// Unique ID of the input for state persistence purposes.
ID string `config:"id"`
// Paths stores the paths to the journal files to be read.
Paths []string `config:"paths"`
// Backoff is the current interval to wait before
Expand Down
30 changes: 18 additions & 12 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import (

"github.com/elastic/beats/v7/libbeat/common/fmtstr"

"github.com/gofrs/uuid"

"github.com/elastic/beats/v7/journalbeat/checkpoint"
"github.com/elastic/beats/v7/journalbeat/reader"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -43,7 +41,6 @@ type Input struct {
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
id uuid.UUID
logger *logp.Logger
eventMeta common.EventMetadata
processors beat.ProcessorList
Expand All @@ -61,13 +58,11 @@ func New(
return nil, err
}

id, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("error while generating ID for input: %v", err)
logger := logp.NewLogger("input")
if config.ID != "" {
logger = logger.With("id", config.ID)
}

logger := logp.NewLogger("input").With("id", id)

var readers []*reader.Reader
if len(config.Paths) == 0 {
cfg := reader.Config{
Expand All @@ -78,9 +73,10 @@ func New(
CursorSeekFallback: config.CursorSeekFallback,
Matches: config.Matches,
SaveRemoteHostname: config.SaveRemoteHostname,
CheckpointID: checkpointID(config.ID, reader.LocalSystemJournalID),
}

state := states[reader.LocalSystemJournalID]
state := states[cfg.CheckpointID]
r, err := reader.NewLocal(cfg, done, state, logger)
if err != nil {
return nil, fmt.Errorf("error creating reader for local journal: %v", err)
Expand All @@ -97,8 +93,10 @@ func New(
CursorSeekFallback: config.CursorSeekFallback,
Matches: config.Matches,
SaveRemoteHostname: config.SaveRemoteHostname,
CheckpointID: checkpointID(config.ID, p),
}
state := states[p]

state := states[cfg.CheckpointID]
r, err := reader.New(cfg, done, state, logger)
if err != nil {
return nil, fmt.Errorf("error creating reader for journal: %v", err)
Expand All @@ -119,7 +117,6 @@ func New(
config: config,
pipeline: b.Publisher,
states: states,
id: id,
logger: logger,
eventMeta: config.EventMetadata,
processors: inputProcessors,
Expand All @@ -138,7 +135,7 @@ func (i *Input) Run() {
Processor: i.processors,
},
ACKCount: func(n int) {
i.logger.Infof("journalbeat successfully published %d events", n)
i.logger.Debugw("journalbeat successfully published events", "event.count", n)
},
})
if err != nil {
Expand Down Expand Up @@ -233,3 +230,12 @@ func processorsForInput(beatInfo beat.Info, config Config) (*processors.Processo

return procs, nil
}

// checkpointID returns the identifier used to track persistent state for the
// input.
func checkpointID(id, path string) string {
if id == "" {
return path
}
return "journald::" + path + "::" + id
}
5 changes: 5 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ journalbeat.inputs:
# When empty starts to read from local journal.
- paths: []

# An optional unique identifier for the input. By providing a unique `id` you
# can operate multiple inputs on the same journal. This allows each input's
# cursor to be persisted independently in the registry file.
#id: ""

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
Expand Down
5 changes: 5 additions & 0 deletions journalbeat/journalbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ journalbeat.inputs:
# When empty starts to read from local journal.
- paths: []

# An optional unique identifier for the input. By providing a unique `id` you
# can operate multiple inputs on the same journal. This allows each input's
# cursor to be persisted independently in the registry file.
#id: ""

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
Expand Down
2 changes: 2 additions & 0 deletions journalbeat/reader/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type Config struct {
Matches []string
// SaveRemoteHostname defines if the original source of the entry needs to be saved.
SaveRemoteHostname bool
// CheckpointID is the identifier to use when persisting state.
CheckpointID string
}

const (
Expand Down
2 changes: 1 addition & 1 deletion journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event {
}

state := checkpoint.JournalState{
Path: r.config.Path,
Path: r.config.CheckpointID,
Cursor: entry.Cursor,
RealtimeTimestamp: entry.RealtimeTimestamp,
MonotonicTimestamp: entry.MonotonicTimestamp,
Expand Down
82 changes: 5 additions & 77 deletions journalbeat/tests/system/config/journalbeat.yml.j2
Original file line number Diff line number Diff line change
@@ -1,85 +1,13 @@
################### Beat Configuration #########################
journalbeat.inputs:
- paths: [{{ journal_path }}]
seek: {{ seek_method }}
{% if cursor_seek_fallback %}
cursor_seek_fallback: {{ cursor_seek_fallback }}
{% endif %}
include_matches: [{{ matches }}]
{% for input in inputs %}
- {{ input | tojson }}
{% endfor %}

{% if registry_file is defined %}
journalbeat.registry_file: {{ registry_file }}
{% endif %}

############################# Output ##########################################

# Configure what outputs to use when sending the data collected by the beat.
# You can enable one or multiple outputs by setting enabled option to true.
output:

### File as output
file:
# Enabling file output
enabled: true

# Path to the directory where to save the generated files. The option is mandatory.
path: {{ output_file_path|default(beat.working_dir + "/output") }}


# Name of the generated files. The default is `journalbeat` and it generates
# files: `journalbeat`, `journalbeat.1`, `journalbeat.2`, etc.
filename: {{ output_file_filename|default("journalbeat") }}

# Maximum size in kilobytes of each file. When this size is reached, the files are
# rotated. The default value is 10 MB.
#rotate_every_kb: 10000

# Maximum number of files under path. When this number of files is reached, the
# oldest file is deleted and the rest are shifted from last to first. The default
# is 7 files.
#number_of_files: 7



############################# Beat #########################################

# The name of the shipper that publishes the network data. It can be used to group
# all the transactions sent by a single shipper in the web interface.
# If this options is not defined, the hostname is used.
#name:

# The tags of the shipper are included in their own field with each
# transaction published. Tags make it easy to group servers by different
# logical properties.
#tags: ["service-X", "web-tier"]



############################# Logging #########################################

#logging:
# Send all logging output to syslog. On Windows default is false, otherwise
# default is true.
#to_syslog: true

# Write all logging output to files. Beats automatically rotate files if configurable
# limit is reached.
#to_files: false

# Enable debug output for selected components.
#selectors: []

# Set log level
#level: error

#files:
# The directory where the log files will written to.
#path: /var/log/journalbeat

# The name of the files where the logs are written to.
#name: journalbeat

# Configure log file size limit. If limit is reached, log file will be
# automatically rotated
#rotateeverybytes: 10485760 # = 10MB

# Number of rotated log files to keep. Oldest files will be deleted first.
#keepfiles: 7
Loading