Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Journalbeat matches support && minor additions #8324

Merged
Merged
17 changes: 17 additions & 0 deletions journalbeat/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
FROM golang:1.10.3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this docker file will become useful but I wonder if it is used in the tests or if it was missing before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was missing before. I has every dependency installed, so everyone can build and run journalbeat.

MAINTAINER Noémi Ványi <[email protected]>

RUN set -x && \
apt-get update && \
apt-get install -y --no-install-recommends \
python-pip virtualenv libsystemd-dev libc6-dev-i386 gcc-arm-linux-gnueabi && \
apt-get clean

RUN pip install --upgrade setuptools

# Setup work environment
ENV JOURNALBEAT_PATH /go/src/github.com/elastic/beats/journalbeat

RUN mkdir -p $JOURNALBEAT_PATH/build/coverage
WORKDIR $JOURNALBEAT_PATH
HEALTHCHECK CMD exit 0
9 changes: 9 additions & 0 deletions journalbeat/_meta/beat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ journalbeat.inputs:
# Position to start reading from journal. Valid values: head, tail, cursor
seek: tail

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []


#========================= Journalbeat global options ============================
#journalbeat:
# Name of the registry file. If a relative path is used, it is considered relative to the
Expand All @@ -43,3 +48,7 @@ journalbeat.inputs:

# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []
3 changes: 3 additions & 0 deletions journalbeat/_meta/fields.common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
description: >
Contains common fields available in all event types.
fields:
- name: read_timestamp
description: >
The time when Journalbeat read the journal entry.
- name: coredump
type: group
description: >
Expand Down
1 change: 1 addition & 0 deletions journalbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
BackoffFactor int `config:"backoff_factor" validate:"min=1"`
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
Seek string `config:"seek"`
Matches []string `config:"matches"`
}

// DefaultConfig are the defaults of a Journalbeat instance
Expand Down
8 changes: 8 additions & 0 deletions journalbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ Contains common fields available in all event types.



*`read_timestamp`*::
+
--
The time when Journalbeat read the journal entry.


--

[float]
== coredump fields

Expand Down
2 changes: 1 addition & 1 deletion journalbeat/include/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions journalbeat/input/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Config struct {
MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"`
// Seek is the method to read from journals.
Seek string `config:"seek"`
// Matches store the key value pairs to match entries.
Matches []string `config:"matches"`
}

var (
Expand Down
2 changes: 2 additions & 0 deletions journalbeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func New(
MaxBackoff: config.MaxBackoff,
BackoffFactor: config.BackoffFactor,
Seek: config.Seek,
Matches: config.Matches,
}

state := states[reader.LocalSystemJournalID]
Expand All @@ -81,6 +82,7 @@ func New(
MaxBackoff: config.MaxBackoff,
BackoffFactor: config.BackoffFactor,
Seek: config.Seek,
Matches: config.Matches,
}
state := states[p]
r, err := reader.New(cfg, done, state, logger)
Expand Down
9 changes: 9 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ journalbeat.inputs:
# Position to start reading from journal. Valid values: head, tail, cursor
seek: tail

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []


#========================= Journalbeat global options ============================
#journalbeat:
# Name of the registry file. If a relative path is used, it is considered relative to the
Expand All @@ -44,6 +49,10 @@ journalbeat.inputs:
# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []

#================================ General ======================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
8 changes: 8 additions & 0 deletions journalbeat/journalbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ journalbeat.inputs:
# Position to start reading from journal. Valid values: head, tail, cursor
seek: tail

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []

#========================= Journalbeat global options ============================
#journalbeat:
# Name of the registry file. If a relative path is used, it is considered relative to the
Expand All @@ -44,6 +48,10 @@ journalbeat.inputs:
# Position to start reading from all journal. Possible values: head, tail, cursor
#seek: head

# Exact matching for field values of events.
# Matching for nginx entries: "systemd.unit=nginx"
#matches: []

#================================ General =====================================

# The name of the shipper that publishes the network data. It can be used to group
Expand Down
14 changes: 14 additions & 0 deletions journalbeat/reader/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import "github.com/coreos/go-systemd/sdjournal"

var (
journaldEventFields = map[string]string{
// provided by systemd journal
"COREDUMP_UNIT": "coredump.unit",
"COREDUMP_USER_UNIT": "coredump.user_unit",
"OBJECT_AUDIT_LOGINUID": "object.audit.login_uid",
Expand Down Expand Up @@ -66,5 +67,18 @@ var (
sdjournal.SD_JOURNAL_FIELD_TRANSPORT: "systemd.transport",
sdjournal.SD_JOURNAL_FIELD_UID: "process.uid",
sdjournal.SD_JOURNAL_FIELD_MESSAGE: "message",

// docker journald fields from: https://docs.docker.com/config/containers/logging/journald/
"CONTAINER_ID": "conatiner.id_truncated",
"CONTAINER_ID_FULL": "container.id",
"CONTAINER_NAME": "container.name",
"CONTAINER_TAG": "container.image.tag",
"CONTAINER_PARTIAL_MESSAGE": "container.partial",

// dropped fields
sdjournal.SD_JOURNAL_FIELD_MONOTONIC_TIMESTAMP: "", // saved in the registry
sdjournal.SD_JOURNAL_FIELD_SOURCE_REALTIME_TIMESTAMP: "", // saved in the registry
sdjournal.SD_JOURNAL_FIELD_CURSOR: "", // saved in the registry
"_SOURCE_MONOTONIC_TIMESTAMP": "", // received timestamp stored in @timestamp
}
)
67 changes: 63 additions & 4 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package reader

import (
"fmt"
"io"
"os"
"strings"
"time"

"github.com/coreos/go-systemd/sdjournal"
Expand Down Expand Up @@ -50,6 +52,8 @@ type Config struct {
Backoff time.Duration
// BackoffFactor is the multiplier of Backoff.
BackoffFactor int
// Matches store the key value pairs to match entries.
Matches []string
}

// Reader reads entries from journal(s).
Expand Down Expand Up @@ -83,6 +87,11 @@ func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *lo
}
}

err = setupMatches(j, c.Matches)
if err != nil {
return nil, err
}

r := &Reader{
journal: j,
changes: make(chan int),
Expand All @@ -108,6 +117,11 @@ func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logge
logger = logger.With("path", "local")
logger.Debug("New local journal is opened for reading")

err = setupMatches(j, c.Matches)
if err != nil {
return nil, err
}

r := &Reader{
journal: j,
changes: make(chan int),
Expand All @@ -119,6 +133,39 @@ func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logge
return r, nil
}

func setupMatches(j *sdjournal.Journal, matches []string) error {
for _, m := range matches {
elems := strings.Split(m, "=")
if len(elems) != 2 {
return fmt.Errorf("invalid match format: %s", m)
}

var p string
for journalKey, eventKey := range journaldEventFields {
if elems[0] == eventKey {
p = journalKey + "=" + elems[1]
}
}

if p == "" {
return fmt.Errorf("cannot create matcher: invalid event key: %s", elems[0])
}

logp.Debug("journal", "Added matcher expression: %s", p)

err := j.AddMatch(p)
if err != nil {
return fmt.Errorf("error adding match to journal %v", err)
}

err = j.AddDisjunction()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh this is an interesting API, this is to add a logical or for multiple matches.
Maybe in a future version we could allow users to configure OR / AND.

if err != nil {
return fmt.Errorf("error adding disjunction to journal: %v", err)
}
}
return nil
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test?

// seek seeks to the position determined by the coniguration and cursor state.
func (r *Reader) seek(cursor string) {
if r.config.Seek == "cursor" {
Expand Down Expand Up @@ -213,21 +260,33 @@ func (r *Reader) readUntilNotNull(entries chan<- *beat.Event) error {
// toEvent creates a beat.Event from journal entries.
func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event {
fields := common.MapStr{}
for journalKey, eventKey := range journaldEventFields {
if entry.Fields[journalKey] != "" {
fields.Put(eventKey, entry.Fields[journalKey])
custom := common.MapStr{}

for k, v := range entry.Fields {
if kk, _ := journaldEventFields[k]; kk == "" {
normalized := strings.ToLower(strings.TrimLeft(k, "_"))
custom.Put(normalized, v)
} else {
fields.Put(kk, v)
}
}

if len(custom) != 0 {
fields["custom"] = custom
}

state := checkpoint.JournalState{
Path: r.config.Path,
Cursor: entry.Cursor,
RealtimeTimestamp: entry.RealtimeTimestamp,
MonotonicTimestamp: entry.MonotonicTimestamp,
}

fields["read_timestamp"] = time.Now()
receivedByJournal := time.Unix(0, int64(entry.RealtimeTimestamp)*1000)

event := beat.Event{
Timestamp: time.Now(),
Timestamp: receivedByJournal,
Fields: fields,
Private: state,
}
Expand Down
1 change: 1 addition & 0 deletions journalbeat/tests/system/config/journalbeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
journalbeat.inputs:
- paths: [{{ journal_path }}]
seek: {{ seek_method }}
matches: [{{ matches }}]

journalbeat.registry: {{ registry_file }}

Expand Down
33 changes: 33 additions & 0 deletions journalbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,39 @@ def test_read_events_with_existing_registry(self):
exit_code = journalbeat_proc.kill_and_wait()
assert exit_code == 0

@unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux")
def test_read_events_with_existing_registry(self):
"""
Journalbeat is able to pass matchers to the journal reader and read filtered messages.
"""

self.render_config_template(
journal_path=self.beat_path + "/tests/system/input/test.journal",
seek_method="head",
matches="syslog.priority=5",
path=os.path.abspath(self.working_dir) + "/log/*",
)
journalbeat_proc = self.start_beat()

required_log_snippets = [
# journalbeat can be started
"journalbeat is running",
# journalbeat can seek to the position defined in the cursor
"Added matcher expression",
# message can be read from test journal
"unhandled HKEY event 0x60b0",
"please report the conditions when this event happened to",
"unhandled HKEY event 0x60b1",
# Four events with priority 5 is publised
"journalbeat successfully published 4 events",
]
for snippet in required_log_snippets:
self.wait_until(lambda: self.log_contains(snippet),
name="Line in '{}' Journalbeat log".format(snippet))

exit_code = journalbeat_proc.kill_and_wait()
assert exit_code == 0


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the integration test!

if __name__ == '__main__':
unittest.main()