Skip to content

Commit

Permalink
Minor Journalbeat fixes and additions (elastic#8973) (elastic#9007)
Browse files Browse the repository at this point in the history
### Refactoring of option `seek`

Previously, the option was string, so deciding which seeker function has to be called used string comparisons. I added `SeekMode` so the mode can be an `iota` and provided its own `Unpack` function. This also takes care of validating the user configured value.

### Field renaming

I renamed `custom.*` prefix to `journald.custom.*`, so users know where those custom fields are coming from.

### Dashboard

It is a minimal dashboard with a few predefined searches. When modules are available to parse messages coming from journald, it is going to be possible create prettier visualizations.

### Skip last event when `seek` is set to `tail`

Previously, if `seek` was set to `tail`, the last event in the journal was read. Now this last event is skipped to avoid duplication.

### Unstoppable Journalbeat (haha)

If the output was unreachable Journalbeat got stuck when it retried to connect to the output. As the Beat never stops trying, it never returned from the last `client.Publish` call. Thus, `publishAll` function never stopped, because it never received any signal from the `done` channel of the input.
The client of each input is closed during `Stop` of each input.

### Registry file path

Previously, Journalbeat put its registry file under `/registry` when installed from deb package. From now the registry file resides under the folder specified  by `-path.data`.
(cherry picked from commit 7fee516)
  • Loading branch information
kvch authored Nov 9, 2018
1 parent 35d006a commit 7c6081d
Show file tree
Hide file tree
Showing 11 changed files with 250 additions and 89 deletions.
14 changes: 1 addition & 13 deletions journalbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s
#max_backoff: 20s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -42,15 +42,3 @@ journalbeat.inputs:
# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []
169 changes: 169 additions & 0 deletions journalbeat/_meta/kibana/6/dashboard/Journalbeat-overview.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
{
"objects": [
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "process.name:systemd"
},
"version": true
}
},
"sort": [
"@timestamp",
"desc"
],
"title": "[Journalbeat] Systemd messages",
"version": 1
},
"id": "aa003e90-e2b9-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:19:28.377Z",
"version": 1
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"journald.kernel.subsystem",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "syslog.facility:0 AND syslog.priority:\u003c4"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] Kernel errors",
"version": 1
},
"id": "5db75310-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:24:29.889Z",
"version": 1
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"process.name",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "syslog.facility:4"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] Login authorization",
"version": 1
},
"id": "82408120-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:26:05.348Z",
"version": 2
},
{
"attributes": {
"columns": [
"@timestamp",
"host.name",
"journald.kernel.subsystem",
"journald.kernel.device_node_path",
"message"
],
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"highlightAll": true,
"index": "journalbeat-*",
"query": {
"language": "lucene",
"query": "journald.kernel.subsystem:usb OR journald.kernel.subsystem:hid"
},
"version": true
}
},
"sort": [
"_score",
"desc"
],
"title": "[Journalbeat] USB and HID messages",
"version": 1
},
"id": "f0232670-e2ba-11e8-9f52-734e93de180d",
"type": "search",
"updated_at": "2018-11-07T18:28:35.543Z",
"version": 1
},
{
"attributes": {
"description": "",
"hits": 0,
"kibanaSavedObjectMeta": {
"searchSourceJSON": {
"filter": [],
"query": {
"language": "lucene",
"query": ""
}
}
},
"optionsJSON": {
"darkTheme": false,
"hidePanelTitles": false,
"useMargins": true
},
"panelsJSON": null,
"timeRestore": false,
"title": "[Journalbeat] Overview",
"version": 1
},
"id": "f2de4440-e2b9-11e8-9f52-734e93de180d",
"type": "dashboard",
"updated_at": "2018-11-07T18:30:18.083Z",
"version": 2
}
],
"version": "7.0.0-alpha1-SNAPSHOT"
}
3 changes: 3 additions & 0 deletions journalbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
)

// Checkpoint persists event log state information to disk.
Expand Down Expand Up @@ -87,6 +88,8 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

c.file = paths.Resolve(paths.Data, c.file)

// Minimum batch size.
if c.maxUpdates < 1 {
c.maxUpdates = 1
Expand Down
53 changes: 42 additions & 11 deletions journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,56 @@
package config

import (
"time"
"fmt"

"github.com/elastic/beats/libbeat/common"
)

// SeekMode is specifies how a journal is read
type SeekMode uint8

// Config stores the configuration of Journalbeat
type Config struct {
Inputs []*common.Config `config:"inputs"`
RegistryFile string `config:"registry_file"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Seek string `config:"seek"`
Matches []string `config:"include_matches"`
}

// DefaultConfig are the defaults of a Journalbeat instance
var DefaultConfig = Config{
RegistryFile: "registry",
Backoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
const (
// SeekInvalid is an invalid value for seek
SeekInvalid SeekMode = iota
// SeekHead option seeks to the head of a journal
SeekHead
// SeekTail option seeks to the tail of a journal
SeekTail
// SeekCursor option seeks to the position specified in the cursor
SeekCursor

seekHeadStr = "head"
seekTailStr = "tail"
seekCursorStr = "cursor"
)

var (
// DefaultConfig are the defaults of a Journalbeat instance
DefaultConfig = Config{
RegistryFile: "registry",
}

seekModes = map[string]SeekMode{
seekHeadStr: SeekHead,
seekTailStr: SeekTail,
seekCursorStr: SeekCursor,
}
)

// Unpack validates and unpack "seek" config option
func (m *SeekMode) Unpack(value string) error {
mode, ok := seekModes[value]
if !ok {
return fmt.Errorf("invalid seek mode '%s'", value)
}

*m = mode

return nil
}
33 changes: 7 additions & 26 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package input

import (
"fmt"
"time"

"github.com/elastic/beats/journalbeat/config"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)
Expand All @@ -29,15 +29,13 @@ import (
type Config struct {
// Paths stores the paths to the journal files to be read.
Paths []string `config:"paths"`
// MaxBackoff is the limit of the backoff time.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// Backoff is the current interval to wait before
// attemting to read again from the journal.
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
// BackoffFactor is the multiplier of Backoff.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// MaxBackoff is the limit of the backoff time.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek string `config:"seek"`
Seek config.SeekMode `config:"seek"`
// Matches store the key value pairs to match entries.
Matches []string `config:"include_matches"`

Expand All @@ -50,25 +48,8 @@ type Config struct {
var (
// DefaultConfig is the defaults for an inputs
DefaultConfig = Config{
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 60 * time.Second,
Seek: "cursor",
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
}
)

// Validate check the configuration of the input.
func (c *Config) Validate() error {
correctSeek := false
for _, s := range []string{"cursor", "head", "tail"} {
if c.Seek == s {
correctSeek = true
}
}

if !correctSeek {
return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek)
}

return nil
}
12 changes: 7 additions & 5 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Input struct {
done chan struct{}
config Config
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
id uuid.UUID
logger *logp.Logger
Expand Down Expand Up @@ -120,7 +121,8 @@ func New(
// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
client, err := i.pipeline.ConnectWith(beat.ClientConfig{
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Expand All @@ -133,13 +135,12 @@ func (i *Input) Run() {
i.logger.Error("Error connecting to output: %v", err)
return
}
defer client.Close()

i.publishAll(client)
i.publishAll()
}

// publishAll reads events from all readers and publishes them.
func (i *Input) publishAll(client beat.Client) {
func (i *Input) publishAll() {
out := make(chan *beat.Event)
defer close(out)

Expand Down Expand Up @@ -179,13 +180,14 @@ func (i *Input) publishAll(client beat.Client) {
case <-i.done:
return
case e := <-out:
client.Publish(*e)
i.client.Publish(*e)
}
}
}

// Stop stops all readers of the input.
func (i *Input) Stop() {
i.client.Close()
for _, r := range i.readers {
r.Close()
}
Expand Down
Loading

0 comments on commit 7c6081d

Please sign in to comment.