Skip to content

Commit

Permalink
Cherry-pick #9106 to 6.x: Add missing journalbeat non breaking fixes (#…
Browse files Browse the repository at this point in the history
…9275)

* Revert "Minor Journalbeat fixes and additions (#8973) (#9007)"

This reverts commit 7c6081d.

* Add missing journalbeat non breaking fixes (#9106)

Backported from master:

* refactoring of `SeekMode`
* journalbeat can be stopped when no output is available
* add dashboard

++ add deprecation warnings for options I want to remove in 7.0
(cherry picked from commit f7638a5)
  • Loading branch information
kvch authored Nov 28, 2018
1 parent 1ceeb87 commit 7981846
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ https://github.com/elastic/beats/compare/v6.5.0...6.x[Check the HEAD diff]

*Journalbeat*

- Add missing journalbeat non breaking fixes. {pull}9106[9106]

*Metricbeat*

- Add missing namespace field in http server metricset {pull}7890[7890]
Expand Down
14 changes: 13 additions & 1 deletion journalbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 20s
#max_backoff: 60s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -42,3 +42,15 @@ journalbeat.inputs:
# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []
22 changes: 19 additions & 3 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import (
"github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/journalbeat/config"
conf "github.com/elastic/beats/journalbeat/config"
)

// Journalbeat instance
type Journalbeat struct {
inputs []*input.Input
done chan struct{}
config config.Config
config conf.Config

pipeline beat.Pipeline
checkpoint *checkpoint.Checkpoint
Expand All @@ -48,7 +48,23 @@ type Journalbeat struct {
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
cfgwarn.Experimental("Journalbeat is experimental.")

config := config.DefaultConfig
if cfg.HasField("seek") {
cfgwarn.Deprecate("7.0.0", "global seek is deprecated, Use seek on input level instead.")
}

if cfg.HasField("backoff") {
cfgwarn.Deprecate("7.0.0", "global backoff is deprecated, Use backoff on input level instead.")
}

if cfg.HasField("max_backoff") {
cfgwarn.Deprecate("7.0.0", "global max_backoff is deprecated, Use max_backoff on input level instead.")
}

if cfg.HasField("include_matches") {
cfgwarn.Deprecate("7.0.0", "global include_matches is deprecated, Use include_matches on input level instead.")
}

config := conf.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("error reading config file: %v", err)
}
Expand Down
3 changes: 0 additions & 3 deletions journalbeat/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/paths"
)

// Checkpoint persists event log state information to disk.
Expand Down Expand Up @@ -88,8 +87,6 @@ func NewCheckpoint(file string, maxUpdates int, interval time.Duration) (*Checkp
save: make(chan JournalState, 1),
}

c.file = paths.Resolve(paths.Data, c.file)

// Minimum batch size.
if c.maxUpdates < 1 {
c.maxUpdates = 1
Expand Down
10 changes: 9 additions & 1 deletion journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package config

import (
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
)
Expand All @@ -33,6 +34,10 @@ type SeekMode uint8
type Config struct {
Inputs []*common.Config `config:"inputs"`
RegistryFile string `config:"registry_file"`
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Seek SeekMode `config:"seek"`
Matches []string `config:"include_matches"`
}

const (
Expand All @@ -50,10 +55,13 @@ const (
seekCursorStr = "cursor"
)

// DefaultConfig are the defaults of a Journalbeat instance
var (
// DefaultConfig are the defaults of a Journalbeat instance
DefaultConfig = Config{
RegistryFile: "registry",
Backoff: 1 * time.Second,
MaxBackoff: 60 * time.Second,
Seek: SeekCursor,
}

seekModes = map[string]SeekMode{
Expand Down
13 changes: 8 additions & 5 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import (
type Config struct {
// Paths stores the paths to the journal files to be read.
Paths []string `config:"paths"`
// MaxBackoff is the limit of the backoff time.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// Backoff is the current interval to wait before
// attemting to read again from the journal.
Backoff time.Duration `config:"backoff" validate:"min=0,nonzero"`
// MaxBackoff is the limit of the backoff time.
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
// BackoffFactor is the multiplier of Backoff.
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek config.SeekMode `config:"seek"`
Expand All @@ -48,8 +50,9 @@ type Config struct {
var (
// DefaultConfig is the defaults for an inputs
DefaultConfig = Config{
Backoff: 1 * time.Second,
MaxBackoff: 20 * time.Second,
Seek: config.SeekCursor,
Backoff: 1 * time.Second,
BackoffFactor: 2,
MaxBackoff: 60 * time.Second,
Seek: config.SeekCursor,
}
)
14 changes: 13 additions & 1 deletion journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 20s
#max_backoff: 60s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -43,6 +43,18 @@ journalbeat.inputs:
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []

#================================ General ======================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
14 changes: 13 additions & 1 deletion journalbeat/journalbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ journalbeat.inputs:
# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 20s
#max_backoff: 60s

# Position to start reading from journal. Valid values: head, tail, cursor
seek: cursor
Expand All @@ -43,6 +43,18 @@ journalbeat.inputs:
# data path.
#registry_file: registry

# The number of seconds to wait before trying to read again from journals.
#backoff: 1s
# The maximum number of seconds to wait before attempting to read again from journals.
#max_backoff: 60s

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
3 changes: 1 addition & 2 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func (r *Reader) seek(cursor string) {
r.logger.Debug("Seeked to position defined in cursor")
case config.SeekTail:
r.journal.SeekTail()
r.journal.Next()
r.logger.Debug("Tailing the journal file")
case config.SeekHead:
r.journal.SeekHead()
Expand Down Expand Up @@ -225,7 +224,7 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event {
}

if len(custom) != 0 {
fields.Put("journald.custom", custom)
fields["custom"] = custom
}

state := checkpoint.JournalState{
Expand Down
6 changes: 2 additions & 4 deletions journalbeat/reader/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ func TestToEvent(t *testing.T) {
},
},
expectedFields: common.MapStr{
"journald": common.MapStr{
"custom": common.MapStr{
"my_custom_field": "value",
},
"custom": common.MapStr{
"my_custom_field": "value",
},
},
},
Expand Down

0 comments on commit 7981846

Please sign in to comment.