From 7c6081d243218b6ece350e2dab3489b955809a92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Fri, 9 Nov 2018 12:38:56 +0100 Subject: [PATCH] Minor Journalbeat fixes and additions (#8973) (#9007) ### Refactoring of option `seek` Previously, the option was string, so deciding which seeker function has to be called used string comparisons. I added `SeekMode` so the mode can be an `iota` and provided its own `Unpack` function. This also takes care of validating the user configured value. ### Field renaming I renamed `custom.*` prefix to `journald.custom.*`, so users know where those custom fields are coming from. ### Dashboard It is a minimal dashboard with a few predefined searches. When modules are available to parse messages coming from journald, it is going to be possible create prettier visualizations. ### Skip last event when `seek` is set to `tail` Previously, if `seek` was set to `tail`, the last event in the journal was read. Now this last event is skipped to avoid duplication. ### Unstoppable Journalbeat (haha) If the output was unreachable Journalbeat got stuck when it retried to connect to the output. As the Beat never stops trying, it never returned from the last `client.Publish` call. Thus, `publishAll` function never stopped, because it never received any signal from the `done` channel of the input. The client of each input is closed during `Stop` of each input. ### Registry file path Previously, Journalbeat put its registry file under `/registry` when installed from deb package. From now the registry file resides under the folder specified by `-path.data`. (cherry picked from commit 7fee516ea7a79f8ab5865186a763e36056dbd658) --- journalbeat/_meta/beat.yml | 14 +- .../6/dashboard/Journalbeat-overview.json | 169 ++++++++++++++++++ journalbeat/checkpoint/checkpoint.go | 3 + journalbeat/config/config.go | 53 ++++-- journalbeat/input/config.go | 33 +--- journalbeat/input/input.go | 12 +- journalbeat/journalbeat.reference.yml | 14 +- journalbeat/journalbeat.yml | 14 +- journalbeat/reader/config.go | 8 +- journalbeat/reader/journal.go | 13 +- journalbeat/reader/journal_test.go | 6 +- 11 files changed, 250 insertions(+), 89 deletions(-) create mode 100644 journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json diff --git a/journalbeat/_meta/beat.yml b/journalbeat/_meta/beat.yml index c4e3c14db56c..78c28ffe36d9 100644 --- a/journalbeat/_meta/beat.yml +++ b/journalbeat/_meta/beat.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s + #max_backoff: 20s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -42,15 +42,3 @@ journalbeat.inputs: # Name of the registry file. If a relative path is used, it is considered relative to the # data path. #registry_file: registry - - # The number of seconds to wait before trying to read again from journals. - #backoff: 1s - # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s - - # Position to start reading from all journal. Possible values: head, tail, cursor - #seek: head - - # Exact matching for field values of events. - # Matching for nginx entries: "systemd.unit=nginx" - #matches: [] diff --git a/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json b/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json new file mode 100644 index 000000000000..fc771e9bebd4 --- /dev/null +++ b/journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json @@ -0,0 +1,169 @@ +{ + "objects": [ + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "process.name:systemd" + }, + "version": true + } + }, + "sort": [ + "@timestamp", + "desc" + ], + "title": "[Journalbeat] Systemd messages", + "version": 1 + }, + "id": "aa003e90-e2b9-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:19:28.377Z", + "version": 1 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "journald.kernel.subsystem", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "syslog.facility:0 AND syslog.priority:\u003c4" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] Kernel errors", + "version": 1 + }, + "id": "5db75310-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:24:29.889Z", + "version": 1 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "process.name", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "syslog.facility:4" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] Login authorization", + "version": 1 + }, + "id": "82408120-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:26:05.348Z", + "version": 2 + }, + { + "attributes": { + "columns": [ + "@timestamp", + "host.name", + "journald.kernel.subsystem", + "journald.kernel.device_node_path", + "message" + ], + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "highlightAll": true, + "index": "journalbeat-*", + "query": { + "language": "lucene", + "query": "journald.kernel.subsystem:usb OR journald.kernel.subsystem:hid" + }, + "version": true + } + }, + "sort": [ + "_score", + "desc" + ], + "title": "[Journalbeat] USB and HID messages", + "version": 1 + }, + "id": "f0232670-e2ba-11e8-9f52-734e93de180d", + "type": "search", + "updated_at": "2018-11-07T18:28:35.543Z", + "version": 1 + }, + { + "attributes": { + "description": "", + "hits": 0, + "kibanaSavedObjectMeta": { + "searchSourceJSON": { + "filter": [], + "query": { + "language": "lucene", + "query": "" + } + } + }, + "optionsJSON": { + "darkTheme": false, + "hidePanelTitles": false, + "useMargins": true + }, + "panelsJSON": null, + "timeRestore": false, + "title": "[Journalbeat] Overview", + "version": 1 + }, + "id": "f2de4440-e2b9-11e8-9f52-734e93de180d", + "type": "dashboard", + "updated_at": "2018-11-07T18:30:18.083Z", + "version": 2 + } + ], + "version": "7.0.0-alpha1-SNAPSHOT" +} diff --git a/journalbeat/checkpoint/checkpoint.go b/journalbeat/checkpoint/checkpoint.go index f2c3bfacdabc..0f29861040b4 100644 --- a/journalbeat/checkpoint/checkpoint.go +++ b/journalbeat/checkpoint/checkpoint.go @@ -32,6 +32,7 @@ import ( "gopkg.in/yaml.v2" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/paths" ) // Checkpoint persists event log state information to disk. @@ -87,6 +88,8 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp save: make(chan JournalState, 1), } + c.file = paths.Resolve(paths.Data, c.file) + // Minimum batch size. if c.maxUpdates < 1 { c.maxUpdates = 1 diff --git a/journalbeat/config/config.go b/journalbeat/config/config.go index a2c5b69d9514..395bf13ec9cb 100644 --- a/journalbeat/config/config.go +++ b/journalbeat/config/config.go @@ -21,25 +21,56 @@ package config import ( - "time" + "fmt" "github.com/elastic/beats/libbeat/common" ) +// SeekMode is specifies how a journal is read +type SeekMode uint8 + // Config stores the configuration of Journalbeat type Config struct { Inputs []*common.Config `config:"inputs"` RegistryFile string `config:"registry_file"` - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` - MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` - Seek string `config:"seek"` - Matches []string `config:"include_matches"` } -// DefaultConfig are the defaults of a Journalbeat instance -var DefaultConfig = Config{ - RegistryFile: "registry", - Backoff: 1 * time.Second, - MaxBackoff: 60 * time.Second, - Seek: "cursor", +const ( + // SeekInvalid is an invalid value for seek + SeekInvalid SeekMode = iota + // SeekHead option seeks to the head of a journal + SeekHead + // SeekTail option seeks to the tail of a journal + SeekTail + // SeekCursor option seeks to the position specified in the cursor + SeekCursor + + seekHeadStr = "head" + seekTailStr = "tail" + seekCursorStr = "cursor" +) + +var ( + // DefaultConfig are the defaults of a Journalbeat instance + DefaultConfig = Config{ + RegistryFile: "registry", + } + + seekModes = map[string]SeekMode{ + seekHeadStr: SeekHead, + seekTailStr: SeekTail, + seekCursorStr: SeekCursor, + } +) + +// Unpack validates and unpack "seek" config option +func (m *SeekMode) Unpack(value string) error { + mode, ok := seekModes[value] + if !ok { + return fmt.Errorf("invalid seek mode '%s'", value) + } + + *m = mode + + return nil } diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index 6383998bd1b0..5bdbfcd2ec95 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -18,9 +18,9 @@ package input import ( - "fmt" "time" + "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" ) @@ -29,15 +29,13 @@ import ( type Config struct { // Paths stores the paths to the journal files to be read. Paths []string `config:"paths"` - // MaxBackoff is the limit of the backoff time. - Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` // Backoff is the current interval to wait before // attemting to read again from the journal. - BackoffFactor int `config:"backoff_factor" validate:"min=1"` - // BackoffFactor is the multiplier of Backoff. + Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"` + // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` // Seek is the method to read from journals. - Seek string `config:"seek"` + Seek config.SeekMode `config:"seek"` // Matches store the key value pairs to match entries. Matches []string `config:"include_matches"` @@ -50,25 +48,8 @@ type Config struct { var ( // DefaultConfig is the defaults for an inputs DefaultConfig = Config{ - Backoff: 1 * time.Second, - BackoffFactor: 2, - MaxBackoff: 60 * time.Second, - Seek: "cursor", + Backoff: 1 * time.Second, + MaxBackoff: 20 * time.Second, + Seek: config.SeekCursor, } ) - -// Validate check the configuration of the input. -func (c *Config) Validate() error { - correctSeek := false - for _, s := range []string{"cursor", "head", "tail"} { - if c.Seek == s { - correctSeek = true - } - } - - if !correctSeek { - return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek) - } - - return nil -} diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index 42d8a0ea394d..094d169a4ca8 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -37,6 +37,7 @@ type Input struct { done chan struct{} config Config pipeline beat.Pipeline + client beat.Client states map[string]checkpoint.JournalState id uuid.UUID logger *logp.Logger @@ -120,7 +121,8 @@ func New( // Run connects to the output, collects entries from the readers // and then publishes the events. func (i *Input) Run() { - client, err := i.pipeline.ConnectWith(beat.ClientConfig{ + var err error + i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, EventMetadata: i.eventMeta, Meta: nil, @@ -133,13 +135,12 @@ func (i *Input) Run() { i.logger.Error("Error connecting to output: %v", err) return } - defer client.Close() - i.publishAll(client) + i.publishAll() } // publishAll reads events from all readers and publishes them. -func (i *Input) publishAll(client beat.Client) { +func (i *Input) publishAll() { out := make(chan *beat.Event) defer close(out) @@ -179,13 +180,14 @@ func (i *Input) publishAll(client beat.Client) { case <-i.done: return case e := <-out: - client.Publish(*e) + i.client.Publish(*e) } } } // Stop stops all readers of the input. func (i *Input) Stop() { + i.client.Close() for _, r := range i.readers { r.Close() } diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index edc5af4b179f..46267fa6b09c 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s + #max_backoff: 20s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -43,18 +43,6 @@ journalbeat.inputs: # data path. #registry_file: registry - # The number of seconds to wait before trying to read again from journals. - #backoff: 1s - # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s - - # Position to start reading from all journal. Possible values: head, tail, cursor - #seek: head - - # Exact matching for field values of events. - # Matching for nginx entries: "systemd.unit=nginx" - #matches: [] - #================================ General ====================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index 753c6ef4f8f7..b2ab42fb81b7 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -21,7 +21,7 @@ journalbeat.inputs: # The number of seconds to wait before trying to read again from journals. #backoff: 1s # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s + #max_backoff: 20s # Position to start reading from journal. Valid values: head, tail, cursor seek: cursor @@ -43,18 +43,6 @@ journalbeat.inputs: # data path. #registry_file: registry - # The number of seconds to wait before trying to read again from journals. - #backoff: 1s - # The maximum number of seconds to wait before attempting to read again from journals. - #max_backoff: 60s - - # Position to start reading from all journal. Possible values: head, tail, cursor - #seek: head - - # Exact matching for field values of events. - # Matching for nginx entries: "systemd.unit=nginx" - #matches: [] - #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/reader/config.go b/journalbeat/reader/config.go index b81005ec926e..7d52ff7422df 100644 --- a/journalbeat/reader/config.go +++ b/journalbeat/reader/config.go @@ -17,7 +17,11 @@ package reader -import "time" +import ( + "time" + + "github.com/elastic/beats/journalbeat/config" +) // Config stores the options of a reder. type Config struct { @@ -25,7 +29,7 @@ type Config struct { Path string // Seek specifies the seeking stategy. // Possible values: head, tail, cursor. - Seek string + Seek config.SeekMode // MaxBackoff is the limit of the backoff time. MaxBackoff time.Duration // Backoff is the current interval to wait before diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index f7afc30a4d0c..8df68170fcda 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/journalbeat/checkpoint" "github.com/elastic/beats/journalbeat/cmd/instance" + "github.com/elastic/beats/journalbeat/config" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -142,7 +143,8 @@ func setupMatches(j *sdjournal.Journal, matches []string) error { // seek seeks to the position determined by the coniguration and cursor state. func (r *Reader) seek(cursor string) { - if r.config.Seek == "cursor" { + switch r.config.Seek { + case config.SeekCursor: if cursor == "" { r.journal.SeekHead() r.logger.Debug("Seeking method set to cursor, but no state is saved for reader. Starting to read from the beginning") @@ -154,12 +156,15 @@ func (r *Reader) seek(cursor string) { r.logger.Error("Error while seeking to cursor") } r.logger.Debug("Seeked to position defined in cursor") - } else if r.config.Seek == "tail" { + case config.SeekTail: r.journal.SeekTail() + r.journal.Next() r.logger.Debug("Tailing the journal file") - } else if r.config.Seek == "head" { + case config.SeekHead: r.journal.SeekHead() r.logger.Debug("Reading from the beginning of the journal file") + default: + r.logger.Error("Invalid seeking mode") } } @@ -220,7 +225,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { } if len(custom) != 0 { - fields["custom"] = custom + fields.Put("journald.custom", custom) } state := checkpoint.JournalState{ diff --git a/journalbeat/reader/journal_test.go b/journalbeat/reader/journal_test.go index 5170afd25933..8c37026f8ba3 100644 --- a/journalbeat/reader/journal_test.go +++ b/journalbeat/reader/journal_test.go @@ -65,8 +65,10 @@ func TestToEvent(t *testing.T) { }, }, expectedFields: common.MapStr{ - "custom": common.MapStr{ - "my_custom_field": "value", + "journald": common.MapStr{ + "custom": common.MapStr{ + "my_custom_field": "value", + }, }, }, },