From c66a0841c4551f71b474c74cd4e3690a50d520c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?No=C3=A9mi=20V=C3=A1nyi?= Date: Thu, 4 Oct 2018 17:11:13 +0200 Subject: [PATCH] Journalbeat matches support && minor additions (#8324) ### Matching support From now on it's possible to match for journal entry fields in Journalbeat using the new option `matches`. This requires a list of key value pairs separated by "=". The key has to be a journalbeat event key (e.g systemd.unit) and the value is the exact value journal reader needs to find in the entries. Example configuration which returns NGINX and dhclient entries from the journal: ```yml include_matches: - "systemd.unit=nginx" - "process.name=dhclient" ``` ### Docker fields Added docker fields from: https://docs.docker.com/config/containers/logging/journald/ - `container.id` - `container.id_truncated` - `container.name` - `container.image.tag` - `container.partial` ### Parse timestamp of entries Journalbeat parses the timestamp of the entry and adds it to the event as `@timestamp`. The time of reading by Journalbeat is saved in `read_timestamp`. ### Save custom fields Custom fields by various sources are stored under `custom`. Field names are normalized, meaning `"_"` prefix is removed and every letter is lowercase. ### Fields && processors From now on it is possible to configure `processors` and `fields`, etc on `input` level. ### Metrics The size of each open reader is reporting in bytes: ``` { "journalbeat": { "journals": { "journal_1": { "path": "system.journal", "size_in_bytes": 123124214, } } } ``` --- journalbeat/Dockerfile | 17 + journalbeat/_meta/beat.yml | 15 + journalbeat/_meta/fields.common.yml | 235 ++++++------ journalbeat/beater/journalbeat.go | 6 + journalbeat/cmd/instance/metrics.go | 66 ++++ journalbeat/config/config.go | 1 + journalbeat/docs/fields.asciidoc | 335 ++---------------- journalbeat/include/fields.go | 2 +- journalbeat/input/config.go | 10 + journalbeat/input/input.go | 45 ++- journalbeat/journalbeat.reference.yml | 37 +- journalbeat/journalbeat.yml | 15 + journalbeat/reader/fields.go | 65 ++-- journalbeat/reader/journal.go | 82 ++++- journalbeat/reader/journal_test.go | 123 +++++++ .../tests/system/config/journalbeat.yml.j2 | 1 + journalbeat/tests/system/test_base.py | 33 ++ 17 files changed, 634 insertions(+), 454 deletions(-) create mode 100644 journalbeat/Dockerfile create mode 100644 journalbeat/cmd/instance/metrics.go create mode 100644 journalbeat/reader/journal_test.go diff --git a/journalbeat/Dockerfile b/journalbeat/Dockerfile new file mode 100644 index 000000000000..975fce8f3820 --- /dev/null +++ b/journalbeat/Dockerfile @@ -0,0 +1,17 @@ +FROM golang:1.10.3 +MAINTAINER Noémi Ványi + +RUN set -x && \ + apt-get update && \ + apt-get install -y --no-install-recommends \ + python-pip virtualenv libsystemd-dev libc6-dev-i386 gcc-arm-linux-gnueabi && \ + apt-get clean + +RUN pip install --upgrade setuptools + +# Setup work environment +ENV JOURNALBEAT_PATH /go/src/github.com/elastic/beats/journalbeat + +RUN mkdir -p $JOURNALBEAT_PATH/build/coverage +WORKDIR $JOURNALBEAT_PATH +HEALTHCHECK CMD exit 0 diff --git a/journalbeat/_meta/beat.yml b/journalbeat/_meta/beat.yml index 724686a91436..d0a9f974ee37 100644 --- a/journalbeat/_meta/beat.yml +++ b/journalbeat/_meta/beat.yml @@ -28,6 +28,17 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: tail + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #include_matches: [] + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + #========================= Journalbeat global options ============================ #journalbeat: # Name of the registry file. If a relative path is used, it is considered relative to the @@ -43,3 +54,7 @@ journalbeat.inputs: # Position to start reading from all journal. Possible values: head, tail, cursor #seek: head + + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] diff --git a/journalbeat/_meta/fields.common.yml b/journalbeat/_meta/fields.common.yml index 23378ad11587..8faa06cc6237 100644 --- a/journalbeat/_meta/fields.common.yml +++ b/journalbeat/_meta/fields.common.yml @@ -3,6 +3,9 @@ description: > Contains common fields available in all event types. fields: + - name: read_timestamp + description: > + The time when Journalbeat read the journal entry. - name: coredump type: group description: > @@ -16,117 +19,145 @@ type: keyword description: > Annotations of messages containing coredumps from user units. - - name: object + - name: journald type: group description: > - Fields to log on behalf of a different program. + Fields provided by journald. fields: - - name: audit + - name: object type: group description: > - Audit fields of event. + Fields to log on behalf of a different program. fields: - - name: loginuid + - name: audit + type: group + description: > + Audit fields of event. + fields: + - name: loginuid + type: long + required: false + example: 1000 + description: > + The login UID of the object process. + - name: session + type: long + required: false + example: 3 + description: > + The audit session of the object process. + - name: cmd + type: keyword + required: false + example: "/lib/systemd/systemd --user" + description: > + The command line of the process. + - name: name + type: keyword + required: false + example: "/lib/systemd/systemd" + description: > + Name of the executable. + - name: executable + type: keyword + required: false + description: > + Path to the the executable. + example: "/lib/systemd/systemd" + - name: uid type: long required: false - example: 1000 description: > - The login UID of the object process. - - name: session + UID of the object process. + - name: gid type: long required: false - example: 3 description: > - The audit session of the object process. - - name: cmd - type: keyword - required: false - example: "/lib/systemd/systemd --user" - description: > - The command line of the process. - - name: name - type: keyword - required: false - example: "/lib/systemd/systemd" - description: > - Name of the executable. - - name: executable - type: keyword - required: false - description: > - Path to the the executable. - example: "/lib/systemd/systemd" - - name: uid - type: long - required: false - description: > - UID of the object process. - - name: gid - type: long - required: false - description: > - GID of the object process. - - name: pid - type: long - required: false - description: > - PID of the object process. - - name: systemd + GID of the object process. + - name: pid + type: long + required: false + description: > + PID of the object process. + - name: systemd + type: group + description: > + Systemd fields of event. + fields: + - name: owner_uid + type: long + required: false + description: > + The UID of the owner. + - name: session + type: keyword + required: false + description: > + The ID of the systemd session. + - name: unit + type: keyword + required: false + description: > + The name of the systemd unit. + - name: user_unit + type: keyword + required: false + description: > + The name of the systemd user unit. + - name: kernel type: group description: > - Systemd fields of event. + Fields to log on behalf of a different program. fields: - - name: owner_uid - type: long + - name: device + type: keyword required: false description: > - The UID of the owner. - - name: session + The kernel device name. + - name: subsystem type: keyword required: false description: > - The ID of the systemd session. - - name: unit - type: keyword + The kernel subsystem name. + - name: device_symlinks + type: text required: false description: > - The name of the systemd unit. - - name: user_unit - type: keyword + Additional symlink names pointing to the device node in /dev. + - name: device_node_path + type: text required: false description: > - The name of the systemd user unit. - - name: kernel - type: group - description: > - Fields to log on behalf of a different program. - fields: - - name: device - type: keyword - required: false - description: > - The kernel device name. - - name: subsystem - type: keyword - required: false - description: > - The kernel subsystem name. - - name: device_symlinks - type: text - required: false - description: > - Additional symlink names pointing to the device node in /dev. - - name: device_node_path - type: text - required: false - description: > - The device node path of this device in /dev. - - name: device_name - type: text - required: false + The device node path of this device in /dev. + - name: device_name + type: text + required: false + description: > + The kernel device name as it shows up in the device tree below /sys. + - name: code + type: group description: > - The kernel device name as it shows up in the device tree below /sys. - - name: process + Fields of the code generating the event. + fields: + - name: file + type: text + required: false + example: "../src/core/manager.c" + description: > + The name of the source file where the log is generated. + - name: function + type: text + required: false + example: "job_log_status_message" + description: > + The name of the function which generated the log message. + - name: line + type: long + required: false + example: 123 + description: > + The line number of the code which generated the log message. + - name: process type: group description: > Fields to log on behalf of a different program. @@ -184,6 +215,10 @@ example: 1 description: > The ID of the user which runs the process. + - name: capabilites + required: false + description: > + The effective capabilites of the process. - name: systemd type: group description: > @@ -217,6 +252,11 @@ example: "user-1234.slice" description: > The systemd slice unit. + - name: user_slice + type: keyword + required: false + description: > + The systemd user slice unit. - name: unit type: keyword required: false @@ -246,29 +286,6 @@ example: "dd8c974asdf01dbe2ef26d7fasdf264c9" description: > The boot ID for the boot the log was generated in. - - name: code - type: group - description: > - Fields of the code generating the event. - fields: - - name: file - type: text - required: false - example: "../src/core/manager.c" - description: > - The name of the source file where the log is generated. - - name: function - type: text - required: false - example: "job_log_status_message" - description: > - The name of the function which generated the log message. - - name: line - type: long - required: false - example: 123 - description: > - The line number of the code which generated the log message. - name: syslog type: group description: > diff --git a/journalbeat/beater/journalbeat.go b/journalbeat/beater/journalbeat.go index 77ae628acbf4..d6f47b61334c 100644 --- a/journalbeat/beater/journalbeat.go +++ b/journalbeat/beater/journalbeat.go @@ -23,9 +23,11 @@ import ( "time" "github.com/elastic/beats/journalbeat/checkpoint" + "github.com/elastic/beats/journalbeat/cmd/instance" "github.com/elastic/beats/journalbeat/input" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/journalbeat/config" @@ -44,6 +46,8 @@ type Journalbeat struct { // New returns a new Journalbeat instance func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { + cfgwarn.Experimental("Journalbeat is experimental.") + config := config.DefaultConfig if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("error reading config file: %v", err) @@ -55,6 +59,8 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { return nil, err } + instance.SetupJournalMetrics() + var inputs []*input.Input for _, c := range config.Inputs { i, err := input.New(c, b.Publisher, done, cp.States()) diff --git a/journalbeat/cmd/instance/metrics.go b/journalbeat/cmd/instance/metrics.go new file mode 100644 index 000000000000..dee1d6eaeb9e --- /dev/null +++ b/journalbeat/cmd/instance/metrics.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package instance + +import ( + "fmt" + + "github.com/coreos/go-systemd/sdjournal" + + "github.com/elastic/beats/libbeat/monitoring" +) + +var ( + metrics *monitoring.Registry + journals map[string]*sdjournal.Journal +) + +// SetupJournalMetrics initializes and registers monitoring functions. +func SetupJournalMetrics() { + metrics = monitoring.Default.NewRegistry("journalbeat") + journals = make(map[string]*sdjournal.Journal) + + monitoring.NewFunc(metrics, "journals", reportJournalSizes, monitoring.Report) +} + +// AddJournalToMonitor adds a new journal which has to be monitored. +func AddJournalToMonitor(path string, journal *sdjournal.Journal) { + journals[path] = journal +} + +// StopMonitoringJournal stops monitoring the journal under the path. +func StopMonitoringJournal(path string) { + delete(journals, path) +} + +func reportJournalSizes(m monitoring.Mode, V monitoring.Visitor) { + i := 0 + for path, journal := range journals { + s, err := journal.GetUsage() + if err != nil { + continue + } + + ns := fmt.Sprintf("journal_%d", i) + monitoring.ReportNamespace(V, ns, func() { + monitoring.ReportString(V, "path", path) + monitoring.ReportInt(V, "size_in_bytes", int64(s)) + }) + i++ + } +} diff --git a/journalbeat/config/config.go b/journalbeat/config/config.go index d502abe608a2..85ba8ef1ab88 100644 --- a/journalbeat/config/config.go +++ b/journalbeat/config/config.go @@ -34,6 +34,7 @@ type Config struct { BackoffFactor int `config:"backoff_factor" validate:"min=1"` MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` Seek string `config:"seek"` + Matches []string `config:"include_matches"` } // DefaultConfig are the defaults of a Journalbeat instance diff --git a/journalbeat/docs/fields.asciidoc b/journalbeat/docs/fields.asciidoc index 107dfafd7033..61dc08cb0848 100644 --- a/journalbeat/docs/fields.asciidoc +++ b/journalbeat/docs/fields.asciidoc @@ -208,6 +208,14 @@ Contains common fields available in all event types. +*`read_timestamp`*:: ++ +-- +The time when Journalbeat read the journal entry. + + +-- + [float] == coredump fields @@ -236,7 +244,7 @@ Annotations of messages containing coredumps from user units. -- [float] -== object fields +== journald fields Fields to log on behalf of a different program. @@ -249,7 +257,7 @@ Audit fields of event. -*`object.audit.loginuid`*:: +*`journald.audit.loginuid`*:: + -- type: long @@ -258,12 +266,12 @@ example: 1000 required: False -The login UID of the object process. +The login UID of the source process. -- -*`object.audit.session`*:: +*`journald.audit.session`*:: + -- type: long @@ -272,12 +280,12 @@ example: 3 required: False -The audit session of the object process. +The audit session of the source process. -- -*`object.cmd`*:: +*`journald.cmd`*:: + -- type: keyword @@ -291,7 +299,7 @@ The command line of the process. -- -*`object.name`*:: +*`journald.name`*:: + -- type: keyword @@ -305,7 +313,7 @@ Name of the executable. -- -*`object.executable`*:: +*`journald.executable`*:: + -- type: keyword @@ -319,249 +327,21 @@ Path to the the executable. -- -*`object.uid`*:: -+ --- -type: long - -required: False - -UID of the object process. - - --- - -*`object.gid`*:: -+ --- -type: long - -required: False - -GID of the object process. - - --- - -*`object.pid`*:: -+ --- -type: long - -required: False - -PID of the object process. - - --- - -[float] -== systemd fields - -Systemd fields of event. - - - -*`object.systemd.owner_uid`*:: -+ --- -type: long - -required: False - -The UID of the owner. - - --- - -*`object.systemd.session`*:: -+ --- -type: keyword - -required: False - -The ID of the systemd session. - - --- - -*`object.systemd.unit`*:: -+ --- -type: keyword - -required: False - -The name of the systemd unit. - - --- - -*`object.systemd.user_unit`*:: -+ --- -type: keyword - -required: False - -The name of the systemd user unit. - - --- - -[float] -== kernel fields - -Fields to log on behalf of a different program. - - - -*`kernel.device`*:: -+ --- -type: keyword - -required: False - -The kernel device name. - - --- - -*`kernel.subsystem`*:: -+ --- -type: keyword - -required: False - -The kernel subsystem name. - - --- - -*`kernel.device_symlinks`*:: -+ --- -type: text - -required: False - -Additional symlink names pointing to the device node in /dev. - - --- - -*`kernel.device_node_path`*:: -+ --- -type: text - -required: False - -The device node path of this device in /dev. - - --- - -*`kernel.device_name`*:: -+ --- -type: text - -required: False - -The kernel device name as it shows up in the device tree below /sys. - - --- - -[float] -== process fields - -Fields to log on behalf of a different program. - - - -[float] -== audit fields - -Audit fields of event. - - - -*`process.audit.loginuid`*:: -+ --- -type: long - -example: 1000 - -required: False - -The login UID of the source process. - - --- - -*`process.audit.session`*:: +*`journald.pid`*:: + -- type: long -example: 3 - -required: False - -The audit session of the source process. - - --- - -*`process.cmd`*:: -+ --- -type: keyword - -example: /lib/systemd/systemd --user - -required: False - -The command line of the process. - - --- - -*`process.name`*:: -+ --- -type: keyword - -example: /lib/systemd/systemd - -required: False - -Name of the executable. - - --- - -*`process.executable`*:: -+ --- -type: keyword - -example: /lib/systemd/systemd +example: 1 required: False -Path to the the executable. +The ID of the process which logged the message. -- -*`process.pid`*:: +*`journald.gid`*:: + -- type: long @@ -570,12 +350,12 @@ example: 1 required: False -The ID of the process which logged the message. +The ID of the group which runs the process. -- -*`process.gid`*:: +*`journald.uid`*:: + -- type: long @@ -584,21 +364,17 @@ example: 1 required: False -The ID of the group which runs the process. +The ID of the user which runs the process. -- -*`process.uid`*:: +*`journald.capabilites`*:: + -- -type: long - -example: 1 - required: False -The ID of the user which runs the process. +The effective capabilites of the process. -- @@ -674,6 +450,18 @@ required: False The systemd slice unit. +-- + +*`systemd.user_slice`*:: ++ +-- +type: keyword + +required: False + +The systemd user slice unit. + + -- *`systemd.unit`*:: @@ -737,55 +525,6 @@ required: False The boot ID for the boot the log was generated in. --- - -[float] -== code fields - -Fields of the code generating the event. - - - -*`code.file`*:: -+ --- -type: text - -example: ../src/core/manager.c - -required: False - -The name of the source file where the log is generated. - - --- - -*`code.function`*:: -+ --- -type: text - -example: job_log_status_message - -required: False - -The name of the function which generated the log message. - - --- - -*`code.line`*:: -+ --- -type: long - -example: 123 - -required: False - -The line number of the code which generated the log message. - - -- [float] diff --git a/journalbeat/include/fields.go b/journalbeat/include/fields.go index 0a8db9a3bddd..aee02e903a41 100644 --- a/journalbeat/include/fields.go +++ b/journalbeat/include/fields.go @@ -31,5 +31,5 @@ func init() { // Asset returns asset data func Asset() string { - return "eJzsW11z27rRvvev2PHN+3ZGom3ZcXJ80ambtCdumzbTk3OtAwFLCjEJ8ACgbfXXdwCCJCiBEvXhZDqTXMQmAe4+u9hPAJ7CI67ugMqikOIMwHCT4x2cv3cv4G+yUoLkCyTm/AyAoaaKl4ZLcQd/PAMAeC+FIVxoTwJSjjnTQJ4Iz8kiR+ACSJ4DPqEwYFYl6uQM/LQ7R2IKghRoQShkVVG6l+Dm3kGmZNW8ibCv//21ZlppZLBYgV5pgwWbNgThEZXAHJaYl6gS/2EIIYRRCW4C2jWMR1w9S8WC94NgAO6FkIbYEQ0yhQK1JhlaFTldcZG1smpIlSw8YsdaJ5uINKr594BlGYegGkBy8RWpOXidjIRcZiAFLHBJ8tSiIcB4mqKyVlIqmSlS7FooUrGISkIgOxQCcG9JNDYr09pKk2DSOu+Qfy4zLirO1mjWMHIpsrUBhb9XXCG7g5TkGtdG8YUUpfW9q8vLy7WxrTIAfFliDQZ+ffhgxTBL9GtkdUlR6yQqgUatufP7UwpwvT96t5QNnF0StPGiYCMcYhh0C/j8IueLCx81mp8wnVrzPx9nSlYIGwGJYJBzgY0Mg+Dt/6+IfiTsf5KihYovSCtjY/Ym2m7sKMxbkHwmZmkDg0UyhGa80G3k5JsmsmbYB6Ed4WcNhOyVIPw8HkL5ShA+j4fgV+i4aP2L98wD47V8FjaPnipgjwhroZlY5vtH4U3vOgmwDlcT7TyIOMK12uPV4YkgLDUALYYBdJHy6PtAbMqlfrVUV5/fu1pi+MTpq4VvqxJfZdeMHNdIHKgWtba+AZCW1wCWGuhcr4qci0e9gcjgizkWzj1j3L4nOXg+jrmGUnJhbMntc16jNclcy3TB8GkQsZ00L4lZvgrkL2toLKPa0rluBnYijFU3pwK3aWdANNj6cSmfNVSlhRfo1ChEWGAun8FWCn3n9Bnre3vnj14mWOCNXkbLStFIMQ3/M73MkAQ/epnd6H/0MsdX8Z1HjreBzv/8ysPzktOldc8MmRvwe0gn7nqOA+uCpoeqKqG32+5RHeJxQF25uA1nvHnaP0fJtCGxKw1x8SSp2yKcR/RymEu/u3lzmV7dvp0xvL25pe/eUXZ1fU0IYzfpjL293CMqdfCsGlOpnM5UJQwvEOiK5m0EsGV4aKLwTDRkKFARgwy4iMTg9XR7VCCzq5vonFN0v06vZtc3/tnnhuks0VSWuFdYFkbJ3Nu4q8x8rdNE+iVHRRRdrjbli/W/p9sM+NI0ub3Evd4ZgVTDLedwKj95izCiAW7R5Mf2TZ1VrFnCHivfwrTfrTXDe55cjAEqMi5eEo3qaT+Yuzv3Qw41Xle1I1r5ELhRROhSqv2AG1XFceuVzmU2Eu5H+exg2pYjDGwKKfKn+vTra31mx/o5ZCn14Qc2XjmWxq4MspDSxHLH6Oav0wxj7+hPb2+IZunlFVvgDNPZLXub2hez2xv60x5rbGGFOcM9N5qMp4buWJLhsZqzNBoWrum3VWHYnw2pM+WRMvUAXSbJhVb0gkqFFwURJEOV0EN9pO5lLDR4XqLCVpE80OOm46SVoCYW1w+Q56tczHOZzbUhptJz7wsHCtQA88VYZwprrrYpkm2PTlE+zq7HI3ctmaiKBaqeeY0CH5SUucy+m1mXikvFzerblt4N1wZ+oxm49/qwbW9JDF/w3E5z6CN2TKgb/7bgG65HgucMheEpR3UKN9TVPtV7y/oAGRr8/pOzAeDRhDuAy2812V629ZEzfyFmgcR012H+XD+NuP5iv9v3DkxPPksgCTZUtmAPQ5hFCBoFa9ww8HydwEMwy33Gu+1ujaZpIqgUKc8qVbdYNsBP7Hs7SAw8kbyyX7p7No4mN/ZRSBMSm7S1gufk53+RjlUPx8SOuVe/2cffWjrSSTyMK9lUWsNxt+JabK5yMpUSdeXkzunKJpj5s4M2LbTAA92pSggusgga25H+R4oRaJqZr4nmCVXQUW0B4yc2ZuXM2S1+mFe4bqJ8j9P5n6wo2pCiPO/5JyMGt/lnKlVBTG9eG2Luq6zSBma3Zgmzy6vbCVzN7q7f3L25Tq6vZ+O06yDZUkV0Gco5iEIqFevXf2tCGZLp7Vzu1YIbRdTKza21RYkNBc7eS1T1QhHB3IPrH0i/ErJ6WmNcR4eeHnvXruqHeazzGADaxirX3LQ+ZQNUzWwNASol1b4lwl/sR00EDG6Wke4gjItUWs+mRLv45fjoXSVDP/DDUNbakoNqaIO1XFDnw1BK30ndEom0jKtyzA7CTuq1mTR3NnNZsS5HvbePUCr5xBlaMQ1hxJB42vrkR+trfrT3qbZr1YUgwtjcTZg3JJtNSqkGs5idmrivkobsumMj3eG94X5+H2ECn6XW3Bquy0kaiEJLcAIZxQlIBYxn3JBcUiQiGcTGhTZEUOwa1gEsD35isG9kkwgUhC65WHfdGIfdmanlEeb1cVz8hHlgZ62ezSwpkPGq2M79U03Cmdh+zH2Z4yq2eZDyWgSVniLRZnpFdwTSgBC4jMi7bMd1DYfrLs1tMTkXG9tVbaH4kenLeNPzn1gsP0uZ5Vh72jB3hdnOVPtvN2eXfN7RmaSPzn+8p39oniPE6zGwbbENv3mO1OZs5+b1mPVZvZTKzOsM0JX3RNClVA2/aevlA9e1W1gQzQ9DcdznBFTJqBOGbZfwBP+9wo4g8Ei/E7ArYuljL46hXThyTXXqAdhCYlHx3EBsI7mDMvLAdAuS9y3P+N2WjldOFphvXm7p1RKwvZ7YgeXBaaLm0xqt32/0JvuxfooQebDFQGCofn+uH3o627Tvd1rm1r3OIbs8fk0++rYi1nSfxNLrABExcqLokhukplInkKFHDv4fkyyBl3e389ubCRBVTKAs6QQKXuo/RA6ZdFLmxNiS/jgk//oFGkIeA0VhpJ5AtaiEqSbwzAWTzwMg+h3P4Rg8nSiPlBQ839wG2pdFTcYLqZAtiZkAwwUnYgKpQlxotkPa3hXDA5F8ifSb/6eb21aDeuCbJ6Z85J2lf3BtbDh9+DwljCnUGiMn9AWhxwnWsFkSxZ6Jwo7ZBCpdkTxfwaf79yGGJoo9VgsrvkHdxbK/h+8ibLvxtgjvV9QdUQgj2fak3H20M/z1QMNeQbCU7ATJKdBAKVkdWaOsYmfRh3L6LBn8+vBhk5G7b1mSUWe441h1FDeZ2f7vpBp0dzDjKhyb2scxqqlBQcpNTqT707FTsQtIxnmeslwK+NJe5bSN7QkKxijfmu5/AwAA//8V2TBe" + return "eJzMW91z47YRf/dfsXMvbWck2pYd38UPnbqXNnHbtDfN5VmBgCWFmAQYALSs/vUdgOCXBH5ZOl/ykIQiuPvbxX4DXsIT7u+ByiyT4gLAcJPiPbz76H6Af8hCCZJukJh3FwAMNVU8N1yKe/jzBQDARykM4UJ7EhBzTJkG8kx4SjYpAhdA0hTwGYUBs89RRxfgl907EksQJMN7UEjY2vAMtSFZ7l4FWZb/fN4i2MWw22IHqKMDZovwa/kjoDBqH3V4UamQFTUXi+seEiWLUb5/LwUsNDLY7EHvtcGMLSuC8IRKYApbTHNUkf+wLW4bRiG4adEuYTzhficVa/3eCwbgQQhpiH2jQcaQodYkQbsdbl+4SGpZNcRKZh6xY62jY0Qa1fprwLKM26AqQH4T2at3KlfymbNytypiY/siN78iPVZBm+2IAmr2RkIqE5ACNrglaWzVQYDxOEZlXSJXMlEki1pfH6JqIyMF6+xNP7hRgG6fLLnKaWVcuml0sDCEp40plQkXBWcB+iW0VIok8FLhbwVXyO4hJqnGwAp8IVluA9L11dVV4P2ofGWccADh58fvrIg2MpT7a3VPUetDeRvJNGrNXWD8UoLdvF4qZwoVxCmS1cEvO9yqPg8fF6YW5N1lyjeXPhxW/4Xl0vr1u3lmaYWz6YQIBikXWMk2KJT99xtJNVOcf5OsFgFfkBbGJsawFM37s8kygu4TMVsbpCy6IYTzlFKnk6Oo0Os4J0kx0bcrWMkbwfp+Hqz8jWB9mgfL7/D5ss5PPjqcmHfkTthi5UsknokhuG12FszrM0nYs88KtsFaRWcPqh/1QSH45pBFK3RWoC2mAcSB+vX3Abuqb49r7rJl+D1WmwyfOX2zVGRV59unkrFD0ROTik2p2a8AruY9gK8UYK33WcrFkw6iNPhyaKYnQXxgjNt3JAXP14HRkEsujO25fJ6vtCuZ688vGT4PSmEXrnNitm8ixucDhJZx6U1cVy8moe6rCL8E4GO7BaLB1udbudNQ5BZyS/dGIcIGU7kDW0EdBwUqGZ4lJPg4ZOlBggIVKU3B1nqHWXcoFsS8pyidrc6mkIyiS63oJZUKLzMiSIIqoq/oFjoBVxaKooMLuy2qsqy1UZHrSgHIwpYTF4Ka4zR9spy/ys06lclaG2IKvfbjkBMFrcDCbsvpthGtltezCYtqG6vzFJtNk7467GYnSOQaPFFkG1QdU50sVF08lyXsRVuWOXOimamzb3R0OKB5heOODmWG3LRnGHPy3h4PYKZs7uHgxTvncLsRLJVPFeAVphkcsPRJEB6u9JUj/aBnD1QGBJk8SOkZopwb/UTYYwOTwWHJfMwDSKYNR6YKHW7zg4Y9QcPX022g8T+/8z66pjJJfGjtDavJ1wPrgqaHqgqhh223+HpAXW83FSclOdnwlBvUp5qnRYFxjNTwZ2wTDjp6eJwzP1HKuCIxlgu5eJbUnQKtA5vzurjy4fabq/j67v2K4d3tHf3wgbLrmxtCGLuNV+z91YzQ2MCzexlL5XSmCuHOFemepnUYso17209gR1rFJHAR2OfDnH9SNLUmFumUU3T/u7xe3dz6Z5+glqtIU5njrNwgjJKpdzTXZ/kupUo3W46KKLrdH8sXmr7N9LoReI5Dp3o4nKWAVP3DrP564qw5YuJorUaTdicqp1jFgSXM2Pkapv2uZyzlRmmnw52IxO3pIJxph9JT9CYSLl4ijep5ntbGx5GvOUb/sjs9cxZpFBE6l2oecKOKMG6916lMJsL9Qe4O2zwXZxVS5M+hE/wK9VZqc0o+s1wtjbGEtpHShFLZwWhgyo4y9oF++/6WaBZfXbMNrjBe3bH3sf1hdXdLv52xxxZWO4W550qT4UzVKgZSmZyqu9HhUp9Cc8Wl4mb/tpVbxbWCXxW/8OD1YbumnBjuyql9if7YV2JC3fu3BV9xPRE8ZygMjzmqc1izLubUXTXrV8hQ4fefXPQAD8amHlx+UmFboboNuvA34zZITHMv7q/l04R7cO4m2szLcB35LIGo1Y8PYG+HeIsQNApWuWErmOoIHlur3Ge8aUI0mqr8o1LEPClUWRzHPMWF/d2+JAaeSVrYL90lOEeTG/sopGkTW9Rh1XPy6z9Lx6qDY2HfuZ9+sY+/1HSkk7gfV3SstIrjuOJqbC7JmEKJMsm489y8Cmb+rKcetNbAW7pThRBcJAE0tpf4nxQT0FQrvySaZ1StWngAjF9YmZUzZ7f57Yks11WU73B695f6Fue7jn8yYnDIP2OpMmI66+oQ81AkhTawujNbWF1d3y3genV/8839NzfRzc1qmnYdpPLCaJ2hnIMopFKxbqo8EMqQRA9zeVAbbhRRe7e21BYlNhQ4e89RlRtFBHMPrtQi3fMGq6cDxmV06Oixcz2xfFiHirQeoHWscnVg7VM2QJXMDhCgUlLNLRH+Zj+qImDr2idpDim5iKX1bEq0i1+Ojx4rGbqBH/qy1kAOKqH1TryCJ28HKX2UuiUSqK73+ZRmapR6aSbV5e1UFqzJUR/tY3Xr1YppCCOGhNPWj/5teQeXdj7Vdq+aEEQYW7sF64pkNV6SqjeL2aWR+yqqyB46NtIR722Pg7sII/gktebWcF1O0kAUWoILSCguQCpgPOGGpJIiEVEvNi60IYJiU9v3YHn0C1sdv00ikBG65eLQdUMcxjNTzaOd16dx8QvWLTur9WxWUYaMF9kw9x9LEs7E5jH3ZY6r2NatlFcjKPQSiTbLazoSSFuEwGVE3mQ7rks4XDdpbsDkXGysd7WG4t8sX6abnv/EYvleyiTF0tP6uStMRlPtf92aMfm8ozNJn5z/eE//rnoOEC/fgTbE2PCbpkhtznZuXr6zPqu3Upl1mQGa8p4IupWq4resvbzn7zZqWBDMD31x3OcEVNGk2fBATPxZ8N8KbAgCD/Q7LXZZKH3M4ti2C0euqk49AFtIbAqeGgiNABsoE8/bBpB8rHl27yId80rJBlN9xO3oTx0G6okRLI9OEyWf2mj9aMab7A/lU4DIoy0GWobqRxnd0NPYpv191DIHx0J9dnn6nvzg24pQ030WSy8DRMDIiaJbbpCaQp1Bhg45+CNGSQQvH+7Wd7cLICpbQJ7TBWQ8138KHA/oKE+JsSX9aUj+8xNUhDwGisJIvYBiUwhTLGDHBZO7HhDdjuf1GDydII+YZDw9HgPNZVGS8UIqZFtiFsBww4lYQKwQN5qNSNtzlXQWks+BfvMPurrh1qsHfnzWxSdeefkX18aG08dPS8KYQq0xcHCaEXqaYBWbLVFsRxQ2zBZQ6IKk6R5+fPjYxlBFsadiY8Uvz2x9LPtn+7cA2+Z9XYR3K+qGKLQj2XBSbj4aDX8d0DArCOaSnSE5tTSQS1ZG1iCr0Cniazl9kgx+fvzumJG7C5uTScdZ01g1FI+Z2f7vrBp0d2HDKpya2qcxKqlBRvJjTqT5u85zsWuRDPM8Z7nU4ks7ldMQ2zMUjEG+Jd3/BwAA//87xgOV" } diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index bc38217696fd..d5c4b741236a 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -20,6 +20,9 @@ package input import ( "fmt" "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" ) // Config stores the options of an input. @@ -35,6 +38,13 @@ type Config struct { MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` // Seek is the method to read from journals. Seek string `config:"seek"` + // Matches store the key value pairs to match entries. + Matches []string `config:"include_matches"` + + // Fields and tags to add to events. + common.EventMetadata `config:",inline"` + // Processors to run on events. + Processors processors.PluginConfig `config:"processors"` } var ( diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index fd0255c531b6..1a42d2cd986b 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -28,17 +28,20 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) // Input manages readers and forwards entries from journals. type Input struct { - readers []*reader.Reader - done chan struct{} - config Config - pipeline beat.Pipeline - states map[string]checkpoint.JournalState - id uuid.UUID - logger *logp.Logger + readers []*reader.Reader + done chan struct{} + config Config + pipeline beat.Pipeline + states map[string]checkpoint.JournalState + id uuid.UUID + logger *logp.Logger + eventMeta common.EventMetadata + processors beat.ProcessorList } // New returns a new Inout @@ -64,6 +67,7 @@ func New( MaxBackoff: config.MaxBackoff, BackoffFactor: config.BackoffFactor, Seek: config.Seek, + Matches: config.Matches, } state := states[reader.LocalSystemJournalID] @@ -81,6 +85,7 @@ func New( MaxBackoff: config.MaxBackoff, BackoffFactor: config.BackoffFactor, Seek: config.Seek, + Matches: config.Matches, } state := states[p] r, err := reader.New(cfg, done, state, logger) @@ -90,16 +95,24 @@ func New( readers = append(readers, r) } + processors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + logp.Info(">>> %v", config.EventMetadata) + logger.Debugf("New input is created for paths %v", config.Paths) return &Input{ - readers: readers, - done: done, - config: config, - pipeline: pipeline, - states: states, - id: id, - logger: logger, + readers: readers, + done: done, + config: config, + pipeline: pipeline, + states: states, + id: id, + logger: logger, + eventMeta: config.EventMetadata, + processors: processors, }, nil } @@ -108,9 +121,9 @@ func New( func (i *Input) Run() { client, err := i.pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, - EventMetadata: common.EventMetadata{}, + EventMetadata: i.eventMeta, Meta: nil, - Processor: nil, + Processor: i.processors, ACKCount: func(n int) { i.logger.Infof("journalbeat successfully published %d events", n) }, diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 718208cb8119..27c12f0c4a29 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -28,6 +28,17 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: tail + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #include_matches: [] + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + #========================= Journalbeat global options ============================ #journalbeat: # Name of the registry file. If a relative path is used, it is considered relative to the @@ -44,6 +55,10 @@ journalbeat.inputs: # Position to start reading from all journal. Possible values: head, tail, cursor #seek: head + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] + #================================ General ====================================== # The name of the shipper that publishes the network data. It can be used to group @@ -161,8 +176,8 @@ journalbeat.inputs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, and -# add_cloud_metadata. +# The supported processors are drop_fields, drop_event, include_fields, +# decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that # contain CPU load percentages, but remove the fields that contain CPU ticks @@ -243,6 +258,24 @@ journalbeat.inputs: #- add_host_metadata: # netinfo.enabled: false # +# The following example enriches each event with process metadata using +# process IDs included in the event. +# +#processors: +#- add_process_metadata: +# match_pids: ["system.process.ppid"] +# target: system.process.parent +# +# The following example decodes fields containing JSON strings +# and replaces the strings with valid JSON objects. +# +#processors: +#- decode_json_fields: +# fields: ["field1", "field2", ...] +# process_array: false +# max_depth: 1 +# target: "" +# overwrite_keys: false #============================= Elastic Cloud ================================== diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index b1df03705bc6..617b70e3832f 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -28,6 +28,17 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: tail + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #include_matches: [] + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + #========================= Journalbeat global options ============================ #journalbeat: # Name of the registry file. If a relative path is used, it is considered relative to the @@ -44,6 +55,10 @@ journalbeat.inputs: # Position to start reading from all journal. Possible values: head, tail, cursor #seek: head + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] + #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/reader/fields.go b/journalbeat/reader/fields.go index 23a82a0f0103..10ee1fbd63e5 100644 --- a/journalbeat/reader/fields.go +++ b/journalbeat/reader/fields.go @@ -21,42 +21,47 @@ import "github.com/coreos/go-systemd/sdjournal" var ( journaldEventFields = map[string]string{ - "COREDUMP_UNIT": "coredump.unit", - "COREDUMP_USER_UNIT": "coredump.user_unit", - "OBJECT_AUDIT_LOGINUID": "object.audit.login_uid", - "OBJECT_AUDIT_SESSION": "object.audit.session", - "OBJECT_CMDLINE": "object.cmd", - "OBJECT_COMM": "object.name", - "OBJECT_EXE": "object.executable", - "OBJECT_GID": "object.gid", - "OBJECT_PID": "object.pid", - "OBJECT_SYSTEMD_OWNER_UID": "object.systemd.owner_uid", - "OBJECT_SYSTEMD_SESSION": "object.systemd.session", - "OBJECT_SYSTEMD_UNIT": "object.systemd.unit", - "OBJECT_SYSTEMD_USER_UNIT": "object.systemd.user_unit", - "OBJECT_UID": "object.uid", - "_KERNEL_DEVICE": "kernel.device", - "_KERNEL_SUBSYSTEM": "kernel.subsystem", - "_SYSTEMD_INVOCATION_ID": "sytemd.invocation_id", - "_UDEV_DEVLINK": "kernel.device_symlinks", // TODO aggregate multiple elements - "_UDEV_DEVNODE": "kernel.device_node_path", - "_UDEV_SYSNAME": "kernel.device_name", + // provided by systemd journal + "COREDUMP_UNIT": "journald.coredump.unit", + "COREDUMP_USER_UNIT": "journald.coredump.user_unit", + "OBJECT_AUDIT_LOGINUID": "journald.object.audit.login_uid", + "OBJECT_AUDIT_SESSION": "journald.object.audit.session", + "OBJECT_CMDLINE": "journald.object.cmd", + "OBJECT_COMM": "journald.object.name", + "OBJECT_EXE": "journald.object.executable", + "OBJECT_GID": "journald.object.gid", + "OBJECT_PID": "journald.object.pid", + "OBJECT_SYSTEMD_OWNER_UID": "journald.object.systemd.owner_uid", + "OBJECT_SYSTEMD_SESSION": "journald.object.systemd.session", + "OBJECT_SYSTEMD_UNIT": "journald.object.systemd.unit", + "OBJECT_SYSTEMD_USER_UNIT": "journald.object.systemd.user_unit", + "OBJECT_UID": "journald.object.uid", + "_KERNEL_DEVICE": "journald.kernel.device", + "_KERNEL_SUBSYSTEM": "journald.kernel.subsystem", + "_SYSTEMD_INVOCATION_ID": "systemd.invocation_id", + "_SYSTEMD_USER_SLICE": "systemd.user_slice", + "_UDEV_DEVLINK": "journald.kernel.device_symlinks", // TODO aggregate multiple elements + "_UDEV_DEVNODE": "journald.kernel.device_node_path", + "_UDEV_SYSNAME": "journald.kernel.device_name", sdjournal.SD_JOURNAL_FIELD_AUDIT_LOGINUID: "process.audit.login_uid", sdjournal.SD_JOURNAL_FIELD_AUDIT_SESSION: "process.audit.session", sdjournal.SD_JOURNAL_FIELD_BOOT_ID: "host.boot_id", + sdjournal.SD_JOURNAL_FIELD_CAP_EFFECTIVE: "process.capabilites", sdjournal.SD_JOURNAL_FIELD_CMDLINE: "process.cmd", - sdjournal.SD_JOURNAL_FIELD_CODE_FILE: "code.file", - sdjournal.SD_JOURNAL_FIELD_CODE_FUNC: "code.func", - sdjournal.SD_JOURNAL_FIELD_CODE_LINE: "code.line", + sdjournal.SD_JOURNAL_FIELD_CODE_FILE: "journald.code.file", + sdjournal.SD_JOURNAL_FIELD_CODE_FUNC: "journald.code.func", + sdjournal.SD_JOURNAL_FIELD_CODE_LINE: "journald.code.line", sdjournal.SD_JOURNAL_FIELD_COMM: "process.name", sdjournal.SD_JOURNAL_FIELD_EXE: "process.executable", sdjournal.SD_JOURNAL_FIELD_GID: "process.uid", sdjournal.SD_JOURNAL_FIELD_HOSTNAME: "host.name", sdjournal.SD_JOURNAL_FIELD_MACHINE_ID: "host.id", + sdjournal.SD_JOURNAL_FIELD_MESSAGE: "message", sdjournal.SD_JOURNAL_FIELD_PID: "process.pid", sdjournal.SD_JOURNAL_FIELD_PRIORITY: "syslog.priority", sdjournal.SD_JOURNAL_FIELD_SYSLOG_FACILITY: "syslog.facility", sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: "syslog.identifier", + sdjournal.SD_JOURNAL_FIELD_SYSLOG_PID: "syslog.pid", sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP: "systemd.cgroup", sdjournal.SD_JOURNAL_FIELD_SYSTEMD_OWNER_UID: "systemd.owner_uid", sdjournal.SD_JOURNAL_FIELD_SYSTEMD_SESSION: "systemd.session", @@ -65,6 +70,18 @@ var ( sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "systemd.user_unit", sdjournal.SD_JOURNAL_FIELD_TRANSPORT: "systemd.transport", sdjournal.SD_JOURNAL_FIELD_UID: "process.uid", - sdjournal.SD_JOURNAL_FIELD_MESSAGE: "message", + + // docker journald fields from: https://docs.docker.com/config/containers/logging/journald/ + "CONTAINER_ID": "conatiner.id_truncated", + "CONTAINER_ID_FULL": "container.id", + "CONTAINER_NAME": "container.name", + "CONTAINER_TAG": "container.image.tag", + "CONTAINER_PARTIAL_MESSAGE": "container.partial", + + // dropped fields + sdjournal.SD_JOURNAL_FIELD_MONOTONIC_TIMESTAMP: "", // saved in the registry + sdjournal.SD_JOURNAL_FIELD_SOURCE_REALTIME_TIMESTAMP: "", // saved in the registry + sdjournal.SD_JOURNAL_FIELD_CURSOR: "", // saved in the registry + "_SOURCE_MONOTONIC_TIMESTAMP": "", // received timestamp stored in @timestamp } ) diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 22aeb56a9246..7cfcb1802b85 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -18,14 +18,17 @@ package reader import ( + "fmt" "io" "os" + "strings" "time" "github.com/coreos/go-systemd/sdjournal" "github.com/pkg/errors" "github.com/elastic/beats/journalbeat/checkpoint" + "github.com/elastic/beats/journalbeat/cmd/instance" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -50,6 +53,8 @@ type Config struct { Backoff time.Duration // BackoffFactor is the multiplier of Backoff. BackoffFactor int + // Matches store the key value pairs to match entries. + Matches []string } // Reader reads entries from journal(s). @@ -83,6 +88,11 @@ func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *lo } } + err = setupMatches(j, c.Matches) + if err != nil { + return nil, err + } + r := &Reader{ journal: j, changes: make(chan int), @@ -92,6 +102,8 @@ func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *lo } r.seek(state.Cursor) + instance.AddJournalToMonitor(c.Path, j) + r.logger.Debug("New journal is opened for reading") return r, nil @@ -105,9 +117,15 @@ func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logge return nil, errors.Wrap(err, "failed to open local journal") } + c.Path = LocalSystemJournalID logger = logger.With("path", "local") logger.Debug("New local journal is opened for reading") + err = setupMatches(j, c.Matches) + if err != nil { + return nil, err + } + r := &Reader{ journal: j, changes: make(chan int), @@ -116,9 +134,46 @@ func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logge logger: logger, } r.seek(state.Cursor) + + instance.AddJournalToMonitor(c.Path, j) + return r, nil } +func setupMatches(j *sdjournal.Journal, matches []string) error { + for _, m := range matches { + elems := strings.Split(m, "=") + if len(elems) != 2 { + return fmt.Errorf("invalid match format: %s", m) + } + + var p string + for journalKey, eventKey := range journaldEventFields { + if elems[0] == eventKey { + p = journalKey + "=" + elems[1] + } + } + + // pass custom fields as is + if p == "" { + p = m + } + + logp.Debug("journal", "Added matcher expression: %s", p) + + err := j.AddMatch(p) + if err != nil { + return fmt.Errorf("error adding match to journal %v", err) + } + + err = j.AddDisjunction() + if err != nil { + return fmt.Errorf("error adding disjunction to journal: %v", err) + } + } + return nil +} + // seek seeks to the position determined by the coniguration and cursor state. func (r *Reader) seek(cursor string) { if r.config.Seek == "cursor" { @@ -213,12 +268,23 @@ func (r *Reader) readUntilNotNull(entries chan<- *beat.Event) error { // toEvent creates a beat.Event from journal entries. func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { fields := common.MapStr{} - for journalKey, eventKey := range journaldEventFields { - if entry.Fields[journalKey] != "" { - fields.Put(eventKey, entry.Fields[journalKey]) + custom := common.MapStr{} + + for k, v := range entry.Fields { + if kk, ok := journaldEventFields[k]; !ok { + normalized := strings.ToLower(strings.TrimLeft(k, "_")) + custom.Put(normalized, v) + } else { + if isKept(kk) { + fields.Put(kk, v) + } } } + if len(custom) != 0 { + fields["custom"] = custom + } + state := checkpoint.JournalState{ Path: r.config.Path, Cursor: entry.Cursor, @@ -226,14 +292,21 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { MonotonicTimestamp: entry.MonotonicTimestamp, } + fields["read_timestamp"] = time.Now() + receivedByJournal := time.Unix(0, int64(entry.RealtimeTimestamp)*1000) + event := beat.Event{ - Timestamp: time.Now(), + Timestamp: receivedByJournal, Fields: fields, Private: state, } return &event } +func isKept(key string) bool { + return key != "" +} + // stopOrWait waits for a journal event. func (r *Reader) stopOrWait() { select { @@ -260,5 +333,6 @@ func (r *Reader) wait() { // Close closes the underlying journal reader. func (r *Reader) Close() { + instance.StopMonitoringJournal(r.config.Path) r.journal.Close() } diff --git a/journalbeat/reader/journal_test.go b/journalbeat/reader/journal_test.go new file mode 100644 index 000000000000..2e7da74aa9a6 --- /dev/null +++ b/journalbeat/reader/journal_test.go @@ -0,0 +1,123 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package reader + +import ( + "reflect" + "testing" + + "github.com/coreos/go-systemd/sdjournal" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/journalbeat/checkpoint" + "github.com/elastic/beats/journalbeat/cmd/instance" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type ToEventTestCase struct { + entry sdjournal.JournalEntry + expectedFields common.MapStr +} + +type SetupMatchesTestCase struct { + matches []string + expectError bool +} + +func TestToEvent(t *testing.T) { + tests := []ToEventTestCase{ + // field name from fields.go + ToEventTestCase{ + entry: sdjournal.JournalEntry{ + Fields: map[string]string{ + sdjournal.SD_JOURNAL_FIELD_BOOT_ID: "123456", + }, + }, + expectedFields: common.MapStr{ + "host": common.MapStr{ + "boot_id": "123456", + }, + }, + }, + // custom field + ToEventTestCase{ + entry: sdjournal.JournalEntry{ + Fields: map[string]string{ + "my_custom_field": "value", + }, + }, + expectedFields: common.MapStr{ + "custom": common.MapStr{ + "my_custom_field": "value", + }, + }, + }, + // dropped field + ToEventTestCase{ + entry: sdjournal.JournalEntry{ + Fields: map[string]string{ + "_SOURCE_MONOTONIC_TIMESTAMP": "value", + }, + }, + expectedFields: common.MapStr{}, + }, + } + + instance.SetupJournalMetrics() + r, err := NewLocal(Config{Path: "dummy.journal"}, nil, checkpoint.JournalState{}, logp.NewLogger("test")) + if err != nil { + t.Fatalf("error creating test journal: %v", err) + } + for _, test := range tests { + event := r.toEvent(&test.entry) + delete(event.Fields, "read_timestamp") + assert.True(t, reflect.DeepEqual(event.Fields, test.expectedFields)) + } +} + +func TestSetupMatches(t *testing.T) { + tests := []SetupMatchesTestCase{ + // correct filter expression + SetupMatchesTestCase{ + matches: []string{"systemd.unit=nginx"}, + expectError: false, + }, + // custom field + SetupMatchesTestCase{ + matches: []string{"_MY_CUSTOM_FIELD=value"}, + expectError: false, + }, + // incorrect separator + SetupMatchesTestCase{ + matches: []string{"systemd.unit~nginx"}, + expectError: true, + }, + } + journal, err := sdjournal.NewJournal() + if err != nil { + t.Fatalf("error while creating test journal: %v", err) + } + + for _, test := range tests { + err = setupMatches(journal, test.matches) + if err != nil && !test.expectError { + t.Errorf("unexpected outcome of setupMatches: error: '%v', expected error: %v", err, test.expectError) + } + } +} diff --git a/journalbeat/tests/system/config/journalbeat.yml.j2 b/journalbeat/tests/system/config/journalbeat.yml.j2 index e2cefdcb6d02..ca8cc1d862cf 100644 --- a/journalbeat/tests/system/config/journalbeat.yml.j2 +++ b/journalbeat/tests/system/config/journalbeat.yml.j2 @@ -2,6 +2,7 @@ journalbeat.inputs: - paths: [{{ journal_path }}] seek: {{ seek_method }} + matches: [{{ matches }}] journalbeat.registry: {{ registry_file }} diff --git a/journalbeat/tests/system/test_base.py b/journalbeat/tests/system/test_base.py index 7b7d16aefb4e..be68e6c08c3b 100644 --- a/journalbeat/tests/system/test_base.py +++ b/journalbeat/tests/system/test_base.py @@ -109,6 +109,39 @@ def test_read_events_with_existing_registry(self): exit_code = journalbeat_proc.kill_and_wait() assert exit_code == 0 + @unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux") + def test_read_events_with_existing_registry(self): + """ + Journalbeat is able to pass matchers to the journal reader and read filtered messages. + """ + + self.render_config_template( + journal_path=self.beat_path + "/tests/system/input/test.journal", + seek_method="head", + matches="syslog.priority=5", + path=os.path.abspath(self.working_dir) + "/log/*", + ) + journalbeat_proc = self.start_beat() + + required_log_snippets = [ + # journalbeat can be started + "journalbeat is running", + # journalbeat can seek to the position defined in the cursor + "Added matcher expression", + # message can be read from test journal + "unhandled HKEY event 0x60b0", + "please report the conditions when this event happened to", + "unhandled HKEY event 0x60b1", + # Four events with priority 5 is publised + "journalbeat successfully published 4 events", + ] + for snippet in required_log_snippets: + self.wait_until(lambda: self.log_contains(snippet), + name="Line in '{}' Journalbeat log".format(snippet)) + + exit_code = journalbeat_proc.kill_and_wait() + assert exit_code == 0 + if __name__ == '__main__': unittest.main()