Skip to content

Commit

Permalink
Do not stop collecting events when journal entries change (#9994)
Browse files Browse the repository at this point in the history
Previously sd_journal_wait was not used. From now on all changes to journals are detected.

I also added custom seccomp policy to Journalbeat.

Closes #9533
(cherry picked from commit cead4b6)
  • Loading branch information
kvch committed Feb 1, 2019
1 parent 5c635b0 commit 9db33a4
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ https://github.com/elastic/beats/compare/v6.6.0...6.x[Check the HEAD diff]
*Journalbeat*

- Add missing journalbeat non breaking fixes. {pull}9106[9106]
- Do not stop collecting events when journal entries change. {pull}9994[9994]

*Metricbeat*

Expand Down
1 change: 1 addition & 0 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/elastic/beats/libbeat/logp"

conf "github.com/elastic/beats/journalbeat/config"
_ "github.com/elastic/beats/journalbeat/include"
)

// Journalbeat instance
Expand Down
3 changes: 3 additions & 0 deletions journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"github.com/elastic/beats/libbeat/common"
)

// SeekMode is specifies how a journal is read
type SeekMode uint8

// Config stores the configuration of Journalbeat
type Config struct {
Inputs []*common.Config `config:"inputs"`
Expand Down
122 changes: 122 additions & 0 deletions journalbeat/include/seccomp_linux_386.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

135 changes: 135 additions & 0 deletions journalbeat/include/seccomp_linux_amd64.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 1 addition & 17 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package input

import (
"fmt"
"time"

"github.com/elastic/beats/journalbeat/config"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
)
Expand Down Expand Up @@ -56,19 +56,3 @@ var (
CursorSeekFallback: config.SeekHead,
}
)

// Validate check the configuration of the input.
func (c *Config) Validate() error {
correctSeek := false
for _, s := range []string{"cursor", "head", "tail"} {
if c.Seek == s {
correctSeek = true
}
}

if !correctSeek {
return fmt.Errorf("incorrect value for seek: %s. possible values: cursor, head, tail", c.Seek)
}

return nil
}
12 changes: 7 additions & 5 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Input struct {
done chan struct{}
config Config
pipeline beat.Pipeline
client beat.Client
states map[string]checkpoint.JournalState
id uuid.UUID
logger *logp.Logger
Expand Down Expand Up @@ -122,7 +123,8 @@ func New(
// Run connects to the output, collects entries from the readers
// and then publishes the events.
func (i *Input) Run() {
client, err := i.pipeline.ConnectWith(beat.ClientConfig{
var err error
i.client, err = i.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: i.eventMeta,
Meta: nil,
Expand All @@ -135,13 +137,12 @@ func (i *Input) Run() {
i.logger.Error("Error connecting to output: %v", err)
return
}
defer client.Close()

i.publishAll(client)
i.publishAll()
}

// publishAll reads events from all readers and publishes them.
func (i *Input) publishAll(client beat.Client) {
func (i *Input) publishAll() {
out := make(chan *beat.Event)
defer close(out)

Expand Down Expand Up @@ -181,13 +182,14 @@ func (i *Input) publishAll(client beat.Client) {
case <-i.done:
return
case e := <-out:
client.Publish(*e)
i.client.Publish(*e)
}
}
}

// Stop stops all readers of the input.
func (i *Input) Stop() {
i.client.Close()
for _, r := range i.readers {
r.Close()
}
Expand Down
6 changes: 5 additions & 1 deletion journalbeat/reader/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package reader

import "time"
import (
"time"

"github.com/elastic/beats/journalbeat/config"
)

// Config stores the options of a reder.
type Config struct {
Expand Down
Loading

0 comments on commit 9db33a4

Please sign in to comment.