diff --git a/journalbeat/Dockerfile b/journalbeat/Dockerfile new file mode 100644 index 000000000000..975fce8f3820 --- /dev/null +++ b/journalbeat/Dockerfile @@ -0,0 +1,17 @@ +FROM golang:1.10.3 +MAINTAINER Noémi Ványi + +RUN set -x && \ + apt-get update && \ + apt-get install -y --no-install-recommends \ + python-pip virtualenv libsystemd-dev libc6-dev-i386 gcc-arm-linux-gnueabi && \ + apt-get clean + +RUN pip install --upgrade setuptools + +# Setup work environment +ENV JOURNALBEAT_PATH /go/src/github.com/elastic/beats/journalbeat + +RUN mkdir -p $JOURNALBEAT_PATH/build/coverage +WORKDIR $JOURNALBEAT_PATH +HEALTHCHECK CMD exit 0 diff --git a/journalbeat/_meta/beat.yml b/journalbeat/_meta/beat.yml index 724686a91436..d0a9f974ee37 100644 --- a/journalbeat/_meta/beat.yml +++ b/journalbeat/_meta/beat.yml @@ -28,6 +28,17 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: tail + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #include_matches: [] + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + #========================= Journalbeat global options ============================ #journalbeat: # Name of the registry file. If a relative path is used, it is considered relative to the @@ -43,3 +54,7 @@ journalbeat.inputs: # Position to start reading from all journal. Possible values: head, tail, cursor #seek: head + + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] diff --git a/journalbeat/_meta/fields.common.yml b/journalbeat/_meta/fields.common.yml index 23378ad11587..8faa06cc6237 100644 --- a/journalbeat/_meta/fields.common.yml +++ b/journalbeat/_meta/fields.common.yml @@ -3,6 +3,9 @@ description: > Contains common fields available in all event types. fields: + - name: read_timestamp + description: > + The time when Journalbeat read the journal entry. - name: coredump type: group description: > @@ -16,117 +19,145 @@ type: keyword description: > Annotations of messages containing coredumps from user units. - - name: object + - name: journald type: group description: > - Fields to log on behalf of a different program. + Fields provided by journald. fields: - - name: audit + - name: object type: group description: > - Audit fields of event. + Fields to log on behalf of a different program. fields: - - name: loginuid + - name: audit + type: group + description: > + Audit fields of event. + fields: + - name: loginuid + type: long + required: false + example: 1000 + description: > + The login UID of the object process. + - name: session + type: long + required: false + example: 3 + description: > + The audit session of the object process. + - name: cmd + type: keyword + required: false + example: "/lib/systemd/systemd --user" + description: > + The command line of the process. + - name: name + type: keyword + required: false + example: "/lib/systemd/systemd" + description: > + Name of the executable. + - name: executable + type: keyword + required: false + description: > + Path to the the executable. + example: "/lib/systemd/systemd" + - name: uid type: long required: false - example: 1000 description: > - The login UID of the object process. - - name: session + UID of the object process. + - name: gid type: long required: false - example: 3 description: > - The audit session of the object process. - - name: cmd - type: keyword - required: false - example: "/lib/systemd/systemd --user" - description: > - The command line of the process. - - name: name - type: keyword - required: false - example: "/lib/systemd/systemd" - description: > - Name of the executable. - - name: executable - type: keyword - required: false - description: > - Path to the the executable. - example: "/lib/systemd/systemd" - - name: uid - type: long - required: false - description: > - UID of the object process. - - name: gid - type: long - required: false - description: > - GID of the object process. - - name: pid - type: long - required: false - description: > - PID of the object process. - - name: systemd + GID of the object process. + - name: pid + type: long + required: false + description: > + PID of the object process. + - name: systemd + type: group + description: > + Systemd fields of event. + fields: + - name: owner_uid + type: long + required: false + description: > + The UID of the owner. + - name: session + type: keyword + required: false + description: > + The ID of the systemd session. + - name: unit + type: keyword + required: false + description: > + The name of the systemd unit. + - name: user_unit + type: keyword + required: false + description: > + The name of the systemd user unit. + - name: kernel type: group description: > - Systemd fields of event. + Fields to log on behalf of a different program. fields: - - name: owner_uid - type: long + - name: device + type: keyword required: false description: > - The UID of the owner. - - name: session + The kernel device name. + - name: subsystem type: keyword required: false description: > - The ID of the systemd session. - - name: unit - type: keyword + The kernel subsystem name. + - name: device_symlinks + type: text required: false description: > - The name of the systemd unit. - - name: user_unit - type: keyword + Additional symlink names pointing to the device node in /dev. + - name: device_node_path + type: text required: false description: > - The name of the systemd user unit. - - name: kernel - type: group - description: > - Fields to log on behalf of a different program. - fields: - - name: device - type: keyword - required: false - description: > - The kernel device name. - - name: subsystem - type: keyword - required: false - description: > - The kernel subsystem name. - - name: device_symlinks - type: text - required: false - description: > - Additional symlink names pointing to the device node in /dev. - - name: device_node_path - type: text - required: false - description: > - The device node path of this device in /dev. - - name: device_name - type: text - required: false + The device node path of this device in /dev. + - name: device_name + type: text + required: false + description: > + The kernel device name as it shows up in the device tree below /sys. + - name: code + type: group description: > - The kernel device name as it shows up in the device tree below /sys. - - name: process + Fields of the code generating the event. + fields: + - name: file + type: text + required: false + example: "../src/core/manager.c" + description: > + The name of the source file where the log is generated. + - name: function + type: text + required: false + example: "job_log_status_message" + description: > + The name of the function which generated the log message. + - name: line + type: long + required: false + example: 123 + description: > + The line number of the code which generated the log message. + - name: process type: group description: > Fields to log on behalf of a different program. @@ -184,6 +215,10 @@ example: 1 description: > The ID of the user which runs the process. + - name: capabilites + required: false + description: > + The effective capabilites of the process. - name: systemd type: group description: > @@ -217,6 +252,11 @@ example: "user-1234.slice" description: > The systemd slice unit. + - name: user_slice + type: keyword + required: false + description: > + The systemd user slice unit. - name: unit type: keyword required: false @@ -246,29 +286,6 @@ example: "dd8c974asdf01dbe2ef26d7fasdf264c9" description: > The boot ID for the boot the log was generated in. - - name: code - type: group - description: > - Fields of the code generating the event. - fields: - - name: file - type: text - required: false - example: "../src/core/manager.c" - description: > - The name of the source file where the log is generated. - - name: function - type: text - required: false - example: "job_log_status_message" - description: > - The name of the function which generated the log message. - - name: line - type: long - required: false - example: 123 - description: > - The line number of the code which generated the log message. - name: syslog type: group description: > diff --git a/journalbeat/beater/journalbeat.go b/journalbeat/beater/journalbeat.go index 77ae628acbf4..d6f47b61334c 100644 --- a/journalbeat/beater/journalbeat.go +++ b/journalbeat/beater/journalbeat.go @@ -23,9 +23,11 @@ import ( "time" "github.com/elastic/beats/journalbeat/checkpoint" + "github.com/elastic/beats/journalbeat/cmd/instance" "github.com/elastic/beats/journalbeat/input" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/common/cfgwarn" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/journalbeat/config" @@ -44,6 +46,8 @@ type Journalbeat struct { // New returns a new Journalbeat instance func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { + cfgwarn.Experimental("Journalbeat is experimental.") + config := config.DefaultConfig if err := cfg.Unpack(&config); err != nil { return nil, fmt.Errorf("error reading config file: %v", err) @@ -55,6 +59,8 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { return nil, err } + instance.SetupJournalMetrics() + var inputs []*input.Input for _, c := range config.Inputs { i, err := input.New(c, b.Publisher, done, cp.States()) diff --git a/journalbeat/cmd/instance/metrics.go b/journalbeat/cmd/instance/metrics.go new file mode 100644 index 000000000000..dee1d6eaeb9e --- /dev/null +++ b/journalbeat/cmd/instance/metrics.go @@ -0,0 +1,66 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package instance + +import ( + "fmt" + + "github.com/coreos/go-systemd/sdjournal" + + "github.com/elastic/beats/libbeat/monitoring" +) + +var ( + metrics *monitoring.Registry + journals map[string]*sdjournal.Journal +) + +// SetupJournalMetrics initializes and registers monitoring functions. +func SetupJournalMetrics() { + metrics = monitoring.Default.NewRegistry("journalbeat") + journals = make(map[string]*sdjournal.Journal) + + monitoring.NewFunc(metrics, "journals", reportJournalSizes, monitoring.Report) +} + +// AddJournalToMonitor adds a new journal which has to be monitored. +func AddJournalToMonitor(path string, journal *sdjournal.Journal) { + journals[path] = journal +} + +// StopMonitoringJournal stops monitoring the journal under the path. +func StopMonitoringJournal(path string) { + delete(journals, path) +} + +func reportJournalSizes(m monitoring.Mode, V monitoring.Visitor) { + i := 0 + for path, journal := range journals { + s, err := journal.GetUsage() + if err != nil { + continue + } + + ns := fmt.Sprintf("journal_%d", i) + monitoring.ReportNamespace(V, ns, func() { + monitoring.ReportString(V, "path", path) + monitoring.ReportInt(V, "size_in_bytes", int64(s)) + }) + i++ + } +} diff --git a/journalbeat/config/config.go b/journalbeat/config/config.go index d502abe608a2..85ba8ef1ab88 100644 --- a/journalbeat/config/config.go +++ b/journalbeat/config/config.go @@ -34,6 +34,7 @@ type Config struct { BackoffFactor int `config:"backoff_factor" validate:"min=1"` MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` Seek string `config:"seek"` + Matches []string `config:"include_matches"` } // DefaultConfig are the defaults of a Journalbeat instance diff --git a/journalbeat/docs/fields.asciidoc b/journalbeat/docs/fields.asciidoc index 107dfafd7033..61dc08cb0848 100644 --- a/journalbeat/docs/fields.asciidoc +++ b/journalbeat/docs/fields.asciidoc @@ -208,6 +208,14 @@ Contains common fields available in all event types. +*`read_timestamp`*:: ++ +-- +The time when Journalbeat read the journal entry. + + +-- + [float] == coredump fields @@ -236,7 +244,7 @@ Annotations of messages containing coredumps from user units. -- [float] -== object fields +== journald fields Fields to log on behalf of a different program. @@ -249,7 +257,7 @@ Audit fields of event. -*`object.audit.loginuid`*:: +*`journald.audit.loginuid`*:: + -- type: long @@ -258,12 +266,12 @@ example: 1000 required: False -The login UID of the object process. +The login UID of the source process. -- -*`object.audit.session`*:: +*`journald.audit.session`*:: + -- type: long @@ -272,12 +280,12 @@ example: 3 required: False -The audit session of the object process. +The audit session of the source process. -- -*`object.cmd`*:: +*`journald.cmd`*:: + -- type: keyword @@ -291,7 +299,7 @@ The command line of the process. -- -*`object.name`*:: +*`journald.name`*:: + -- type: keyword @@ -305,7 +313,7 @@ Name of the executable. -- -*`object.executable`*:: +*`journald.executable`*:: + -- type: keyword @@ -319,249 +327,21 @@ Path to the the executable. -- -*`object.uid`*:: -+ --- -type: long - -required: False - -UID of the object process. - - --- - -*`object.gid`*:: -+ --- -type: long - -required: False - -GID of the object process. - - --- - -*`object.pid`*:: -+ --- -type: long - -required: False - -PID of the object process. - - --- - -[float] -== systemd fields - -Systemd fields of event. - - - -*`object.systemd.owner_uid`*:: -+ --- -type: long - -required: False - -The UID of the owner. - - --- - -*`object.systemd.session`*:: -+ --- -type: keyword - -required: False - -The ID of the systemd session. - - --- - -*`object.systemd.unit`*:: -+ --- -type: keyword - -required: False - -The name of the systemd unit. - - --- - -*`object.systemd.user_unit`*:: -+ --- -type: keyword - -required: False - -The name of the systemd user unit. - - --- - -[float] -== kernel fields - -Fields to log on behalf of a different program. - - - -*`kernel.device`*:: -+ --- -type: keyword - -required: False - -The kernel device name. - - --- - -*`kernel.subsystem`*:: -+ --- -type: keyword - -required: False - -The kernel subsystem name. - - --- - -*`kernel.device_symlinks`*:: -+ --- -type: text - -required: False - -Additional symlink names pointing to the device node in /dev. - - --- - -*`kernel.device_node_path`*:: -+ --- -type: text - -required: False - -The device node path of this device in /dev. - - --- - -*`kernel.device_name`*:: -+ --- -type: text - -required: False - -The kernel device name as it shows up in the device tree below /sys. - - --- - -[float] -== process fields - -Fields to log on behalf of a different program. - - - -[float] -== audit fields - -Audit fields of event. - - - -*`process.audit.loginuid`*:: -+ --- -type: long - -example: 1000 - -required: False - -The login UID of the source process. - - --- - -*`process.audit.session`*:: +*`journald.pid`*:: + -- type: long -example: 3 - -required: False - -The audit session of the source process. - - --- - -*`process.cmd`*:: -+ --- -type: keyword - -example: /lib/systemd/systemd --user - -required: False - -The command line of the process. - - --- - -*`process.name`*:: -+ --- -type: keyword - -example: /lib/systemd/systemd - -required: False - -Name of the executable. - - --- - -*`process.executable`*:: -+ --- -type: keyword - -example: /lib/systemd/systemd +example: 1 required: False -Path to the the executable. +The ID of the process which logged the message. -- -*`process.pid`*:: +*`journald.gid`*:: + -- type: long @@ -570,12 +350,12 @@ example: 1 required: False -The ID of the process which logged the message. +The ID of the group which runs the process. -- -*`process.gid`*:: +*`journald.uid`*:: + -- type: long @@ -584,21 +364,17 @@ example: 1 required: False -The ID of the group which runs the process. +The ID of the user which runs the process. -- -*`process.uid`*:: +*`journald.capabilites`*:: + -- -type: long - -example: 1 - required: False -The ID of the user which runs the process. +The effective capabilites of the process. -- @@ -674,6 +450,18 @@ required: False The systemd slice unit. +-- + +*`systemd.user_slice`*:: ++ +-- +type: keyword + +required: False + +The systemd user slice unit. + + -- *`systemd.unit`*:: @@ -737,55 +525,6 @@ required: False The boot ID for the boot the log was generated in. --- - -[float] -== code fields - -Fields of the code generating the event. - - - -*`code.file`*:: -+ --- -type: text - -example: ../src/core/manager.c - -required: False - -The name of the source file where the log is generated. - - --- - -*`code.function`*:: -+ --- -type: text - -example: job_log_status_message - -required: False - -The name of the function which generated the log message. - - --- - -*`code.line`*:: -+ --- -type: long - -example: 123 - -required: False - -The line number of the code which generated the log message. - - -- [float] diff --git a/journalbeat/include/fields.go b/journalbeat/include/fields.go index 0a8db9a3bddd..aee02e903a41 100644 --- a/journalbeat/include/fields.go +++ b/journalbeat/include/fields.go @@ -31,5 +31,5 @@ func init() { // Asset returns asset data func Asset() string { - return "eJzsW11z27rRvvev2PHN+3ZGom3ZcXJ80ambtCdumzbTk3OtAwFLCjEJ8ACgbfXXdwCCJCiBEvXhZDqTXMQmAe4+u9hPAJ7CI67ugMqikOIMwHCT4x2cv3cv4G+yUoLkCyTm/AyAoaaKl4ZLcQd/PAMAeC+FIVxoTwJSjjnTQJ4Iz8kiR+ACSJ4DPqEwYFYl6uQM/LQ7R2IKghRoQShkVVG6l+Dm3kGmZNW8ibCv//21ZlppZLBYgV5pgwWbNgThEZXAHJaYl6gS/2EIIYRRCW4C2jWMR1w9S8WC94NgAO6FkIbYEQ0yhQK1JhlaFTldcZG1smpIlSw8YsdaJ5uINKr594BlGYegGkBy8RWpOXidjIRcZiAFLHBJ8tSiIcB4mqKyVlIqmSlS7FooUrGISkIgOxQCcG9JNDYr09pKk2DSOu+Qfy4zLirO1mjWMHIpsrUBhb9XXCG7g5TkGtdG8YUUpfW9q8vLy7WxrTIAfFliDQZ+ffhgxTBL9GtkdUlR6yQqgUatufP7UwpwvT96t5QNnF0StPGiYCMcYhh0C/j8IueLCx81mp8wnVrzPx9nSlYIGwGJYJBzgY0Mg+Dt/6+IfiTsf5KihYovSCtjY/Ym2m7sKMxbkHwmZmkDg0UyhGa80G3k5JsmsmbYB6Ed4WcNhOyVIPw8HkL5ShA+j4fgV+i4aP2L98wD47V8FjaPnipgjwhroZlY5vtH4U3vOgmwDlcT7TyIOMK12uPV4YkgLDUALYYBdJHy6PtAbMqlfrVUV5/fu1pi+MTpq4VvqxJfZdeMHNdIHKgWtba+AZCW1wCWGuhcr4qci0e9gcjgizkWzj1j3L4nOXg+jrmGUnJhbMntc16jNclcy3TB8GkQsZ00L4lZvgrkL2toLKPa0rluBnYijFU3pwK3aWdANNj6cSmfNVSlhRfo1ChEWGAun8FWCn3n9Bnre3vnj14mWOCNXkbLStFIMQ3/M73MkAQ/epnd6H/0MsdX8Z1HjreBzv/8ysPzktOldc8MmRvwe0gn7nqOA+uCpoeqKqG32+5RHeJxQF25uA1nvHnaP0fJtCGxKw1x8SSp2yKcR/RymEu/u3lzmV7dvp0xvL25pe/eUXZ1fU0IYzfpjL293CMqdfCsGlOpnM5UJQwvEOiK5m0EsGV4aKLwTDRkKFARgwy4iMTg9XR7VCCzq5vonFN0v06vZtc3/tnnhuks0VSWuFdYFkbJ3Nu4q8x8rdNE+iVHRRRdrjbli/W/p9sM+NI0ub3Evd4ZgVTDLedwKj95izCiAW7R5Mf2TZ1VrFnCHivfwrTfrTXDe55cjAEqMi5eEo3qaT+Yuzv3Qw41Xle1I1r5ELhRROhSqv2AG1XFceuVzmU2Eu5H+exg2pYjDGwKKfKn+vTra31mx/o5ZCn14Qc2XjmWxq4MspDSxHLH6Oav0wxj7+hPb2+IZunlFVvgDNPZLXub2hez2xv60x5rbGGFOcM9N5qMp4buWJLhsZqzNBoWrum3VWHYnw2pM+WRMvUAXSbJhVb0gkqFFwURJEOV0EN9pO5lLDR4XqLCVpE80OOm46SVoCYW1w+Q56tczHOZzbUhptJz7wsHCtQA88VYZwprrrYpkm2PTlE+zq7HI3ctmaiKBaqeeY0CH5SUucy+m1mXikvFzerblt4N1wZ+oxm49/qwbW9JDF/w3E5z6CN2TKgb/7bgG65HgucMheEpR3UKN9TVPtV7y/oAGRr8/pOzAeDRhDuAy2812V629ZEzfyFmgcR012H+XD+NuP5iv9v3DkxPPksgCTZUtmAPQ5hFCBoFa9ww8HydwEMwy33Gu+1ujaZpIqgUKc8qVbdYNsBP7Hs7SAw8kbyyX7p7No4mN/ZRSBMSm7S1gufk53+RjlUPx8SOuVe/2cffWjrSSTyMK9lUWsNxt+JabK5yMpUSdeXkzunKJpj5s4M2LbTAA92pSggusgga25H+R4oRaJqZr4nmCVXQUW0B4yc2ZuXM2S1+mFe4bqJ8j9P5n6wo2pCiPO/5JyMGt/lnKlVBTG9eG2Luq6zSBma3Zgmzy6vbCVzN7q7f3L25Tq6vZ+O06yDZUkV0Gco5iEIqFevXf2tCGZLp7Vzu1YIbRdTKza21RYkNBc7eS1T1QhHB3IPrH0i/ErJ6WmNcR4eeHnvXruqHeazzGADaxirX3LQ+ZQNUzWwNASol1b4lwl/sR00EDG6Wke4gjItUWs+mRLv45fjoXSVDP/DDUNbakoNqaIO1XFDnw1BK30ndEom0jKtyzA7CTuq1mTR3NnNZsS5HvbePUCr5xBlaMQ1hxJB42vrkR+trfrT3qbZr1YUgwtjcTZg3JJtNSqkGs5idmrivkobsumMj3eG94X5+H2ECn6XW3Bquy0kaiEJLcAIZxQlIBYxn3JBcUiQiGcTGhTZEUOwa1gEsD35isG9kkwgUhC65WHfdGIfdmanlEeb1cVz8hHlgZ62ezSwpkPGq2M79U03Cmdh+zH2Z4yq2eZDyWgSVniLRZnpFdwTSgBC4jMi7bMd1DYfrLs1tMTkXG9tVbaH4kenLeNPzn1gsP0uZ5Vh72jB3hdnOVPtvN2eXfN7RmaSPzn+8p39oniPE6zGwbbENv3mO1OZs5+b1mPVZvZTKzOsM0JX3RNClVA2/aevlA9e1W1gQzQ9DcdznBFTJqBOGbZfwBP+9wo4g8Ei/E7ArYuljL46hXThyTXXqAdhCYlHx3EBsI7mDMvLAdAuS9y3P+N2WjldOFphvXm7p1RKwvZ7YgeXBaaLm0xqt32/0JvuxfooQebDFQGCofn+uH3o627Tvd1rm1r3OIbs8fk0++rYi1nSfxNLrABExcqLokhukplInkKFHDv4fkyyBl3e389ubCRBVTKAs6QQKXuo/RA6ZdFLmxNiS/jgk//oFGkIeA0VhpJ5AtaiEqSbwzAWTzwMg+h3P4Rg8nSiPlBQ839wG2pdFTcYLqZAtiZkAwwUnYgKpQlxotkPa3hXDA5F8ifSb/6eb21aDeuCbJ6Z85J2lf3BtbDh9+DwljCnUGiMn9AWhxwnWsFkSxZ6Jwo7ZBCpdkTxfwaf79yGGJoo9VgsrvkHdxbK/h+8ibLvxtgjvV9QdUQgj2fak3H20M/z1QMNeQbCU7ATJKdBAKVkdWaOsYmfRh3L6LBn8+vBhk5G7b1mSUWe441h1FDeZ2f7vpBp0dzDjKhyb2scxqqlBQcpNTqT707FTsQtIxnmeslwK+NJe5bSN7QkKxijfmu5/AwAA//8V2TBe" + return "eJzMW91z47YRf/dfsXMvbWck2pYd38UPnbqXNnHbtDfN5VmBgCWFmAQYALSs/vUdgOCXBH5ZOl/ykIQiuPvbxX4DXsIT7u+ByiyT4gLAcJPiPbz76H6Af8hCCZJukJh3FwAMNVU8N1yKe/jzBQDARykM4UJ7EhBzTJkG8kx4SjYpAhdA0hTwGYUBs89RRxfgl907EksQJMN7UEjY2vAMtSFZ7l4FWZb/fN4i2MWw22IHqKMDZovwa/kjoDBqH3V4UamQFTUXi+seEiWLUb5/LwUsNDLY7EHvtcGMLSuC8IRKYApbTHNUkf+wLW4bRiG4adEuYTzhficVa/3eCwbgQQhpiH2jQcaQodYkQbsdbl+4SGpZNcRKZh6xY62jY0Qa1fprwLKM26AqQH4T2at3KlfymbNytypiY/siN78iPVZBm+2IAmr2RkIqE5ACNrglaWzVQYDxOEZlXSJXMlEki1pfH6JqIyMF6+xNP7hRgG6fLLnKaWVcuml0sDCEp40plQkXBWcB+iW0VIok8FLhbwVXyO4hJqnGwAp8IVluA9L11dVV4P2ofGWccADh58fvrIg2MpT7a3VPUetDeRvJNGrNXWD8UoLdvF4qZwoVxCmS1cEvO9yqPg8fF6YW5N1lyjeXPhxW/4Xl0vr1u3lmaYWz6YQIBikXWMk2KJT99xtJNVOcf5OsFgFfkBbGJsawFM37s8kygu4TMVsbpCy6IYTzlFKnk6Oo0Os4J0kx0bcrWMkbwfp+Hqz8jWB9mgfL7/D5ss5PPjqcmHfkTthi5UsknokhuG12FszrM0nYs88KtsFaRWcPqh/1QSH45pBFK3RWoC2mAcSB+vX3Abuqb49r7rJl+D1WmwyfOX2zVGRV59unkrFD0ROTik2p2a8AruY9gK8UYK33WcrFkw6iNPhyaKYnQXxgjNt3JAXP14HRkEsujO25fJ6vtCuZ688vGT4PSmEXrnNitm8ixucDhJZx6U1cVy8moe6rCL8E4GO7BaLB1udbudNQ5BZyS/dGIcIGU7kDW0EdBwUqGZ4lJPg4ZOlBggIVKU3B1nqHWXcoFsS8pyidrc6mkIyiS63oJZUKLzMiSIIqoq/oFjoBVxaKooMLuy2qsqy1UZHrSgHIwpYTF4Ka4zR9spy/ys06lclaG2IKvfbjkBMFrcDCbsvpthGtltezCYtqG6vzFJtNk7467GYnSOQaPFFkG1QdU50sVF08lyXsRVuWOXOimamzb3R0OKB5heOODmWG3LRnGHPy3h4PYKZs7uHgxTvncLsRLJVPFeAVphkcsPRJEB6u9JUj/aBnD1QGBJk8SOkZopwb/UTYYwOTwWHJfMwDSKYNR6YKHW7zg4Y9QcPX022g8T+/8z66pjJJfGjtDavJ1wPrgqaHqgqhh223+HpAXW83FSclOdnwlBvUp5qnRYFxjNTwZ2wTDjp6eJwzP1HKuCIxlgu5eJbUnQKtA5vzurjy4fabq/j67v2K4d3tHf3wgbLrmxtCGLuNV+z91YzQ2MCzexlL5XSmCuHOFemepnUYso17209gR1rFJHAR2OfDnH9SNLUmFumUU3T/u7xe3dz6Z5+glqtIU5njrNwgjJKpdzTXZ/kupUo3W46KKLrdH8sXmr7N9LoReI5Dp3o4nKWAVP3DrP564qw5YuJorUaTdicqp1jFgSXM2Pkapv2uZyzlRmmnw52IxO3pIJxph9JT9CYSLl4ijep5ntbGx5GvOUb/sjs9cxZpFBE6l2oecKOKMG6916lMJsL9Qe4O2zwXZxVS5M+hE/wK9VZqc0o+s1wtjbGEtpHShFLZwWhgyo4y9oF++/6WaBZfXbMNrjBe3bH3sf1hdXdLv52xxxZWO4W550qT4UzVKgZSmZyqu9HhUp9Cc8Wl4mb/tpVbxbWCXxW/8OD1YbumnBjuyql9if7YV2JC3fu3BV9xPRE8ZygMjzmqc1izLubUXTXrV8hQ4fefXPQAD8amHlx+UmFboboNuvA34zZITHMv7q/l04R7cO4m2szLcB35LIGo1Y8PYG+HeIsQNApWuWErmOoIHlur3Ge8aUI0mqr8o1LEPClUWRzHPMWF/d2+JAaeSVrYL90lOEeTG/sopGkTW9Rh1XPy6z9Lx6qDY2HfuZ9+sY+/1HSkk7gfV3SstIrjuOJqbC7JmEKJMsm489y8Cmb+rKcetNbAW7pThRBcJAE0tpf4nxQT0FQrvySaZ1StWngAjF9YmZUzZ7f57Yks11WU73B695f6Fue7jn8yYnDIP2OpMmI66+oQ81AkhTawujNbWF1d3y3genV/8839NzfRzc1qmnYdpPLCaJ2hnIMopFKxbqo8EMqQRA9zeVAbbhRRe7e21BYlNhQ4e89RlRtFBHMPrtQi3fMGq6cDxmV06Oixcz2xfFiHirQeoHWscnVg7VM2QJXMDhCgUlLNLRH+Zj+qImDr2idpDim5iKX1bEq0i1+Ojx4rGbqBH/qy1kAOKqH1TryCJ28HKX2UuiUSqK73+ZRmapR6aSbV5e1UFqzJUR/tY3Xr1YppCCOGhNPWj/5teQeXdj7Vdq+aEEQYW7sF64pkNV6SqjeL2aWR+yqqyB46NtIR722Pg7sII/gktebWcF1O0kAUWoILSCguQCpgPOGGpJIiEVEvNi60IYJiU9v3YHn0C1sdv00ikBG65eLQdUMcxjNTzaOd16dx8QvWLTur9WxWUYaMF9kw9x9LEs7E5jH3ZY6r2NatlFcjKPQSiTbLazoSSFuEwGVE3mQ7rks4XDdpbsDkXGysd7WG4t8sX6abnv/EYvleyiTF0tP6uStMRlPtf92aMfm8ozNJn5z/eE//rnoOEC/fgTbE2PCbpkhtznZuXr6zPqu3Upl1mQGa8p4IupWq4resvbzn7zZqWBDMD31x3OcEVNGk2fBATPxZ8N8KbAgCD/Q7LXZZKH3M4ti2C0euqk49AFtIbAqeGgiNABsoE8/bBpB8rHl27yId80rJBlN9xO3oTx0G6okRLI9OEyWf2mj9aMab7A/lU4DIoy0GWobqRxnd0NPYpv191DIHx0J9dnn6nvzg24pQ030WSy8DRMDIiaJbbpCaQp1Bhg45+CNGSQQvH+7Wd7cLICpbQJ7TBWQ8138KHA/oKE+JsSX9aUj+8xNUhDwGisJIvYBiUwhTLGDHBZO7HhDdjuf1GDydII+YZDw9HgPNZVGS8UIqZFtiFsBww4lYQKwQN5qNSNtzlXQWks+BfvMPurrh1qsHfnzWxSdeefkX18aG08dPS8KYQq0xcHCaEXqaYBWbLVFsRxQ2zBZQ6IKk6R5+fPjYxlBFsadiY8Uvz2x9LPtn+7cA2+Z9XYR3K+qGKLQj2XBSbj4aDX8d0DArCOaSnSE5tTSQS1ZG1iCr0Cniazl9kgx+fvzumJG7C5uTScdZ01g1FI+Z2f7vrBp0d2HDKpya2qcxKqlBRvJjTqT5u85zsWuRDPM8Z7nU4ks7ldMQ2zMUjEG+Jd3/BwAA//87xgOV" } diff --git a/journalbeat/input/config.go b/journalbeat/input/config.go index bc38217696fd..d5c4b741236a 100644 --- a/journalbeat/input/config.go +++ b/journalbeat/input/config.go @@ -20,6 +20,9 @@ package input import ( "fmt" "time" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" ) // Config stores the options of an input. @@ -35,6 +38,13 @@ type Config struct { MaxBackoff time.Duration `config:"max_backoff" validate:"min=0,nonzero"` // Seek is the method to read from journals. Seek string `config:"seek"` + // Matches store the key value pairs to match entries. + Matches []string `config:"include_matches"` + + // Fields and tags to add to events. + common.EventMetadata `config:",inline"` + // Processors to run on events. + Processors processors.PluginConfig `config:"processors"` } var ( diff --git a/journalbeat/input/input.go b/journalbeat/input/input.go index fd0255c531b6..1a42d2cd986b 100644 --- a/journalbeat/input/input.go +++ b/journalbeat/input/input.go @@ -28,17 +28,20 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/processors" ) // Input manages readers and forwards entries from journals. type Input struct { - readers []*reader.Reader - done chan struct{} - config Config - pipeline beat.Pipeline - states map[string]checkpoint.JournalState - id uuid.UUID - logger *logp.Logger + readers []*reader.Reader + done chan struct{} + config Config + pipeline beat.Pipeline + states map[string]checkpoint.JournalState + id uuid.UUID + logger *logp.Logger + eventMeta common.EventMetadata + processors beat.ProcessorList } // New returns a new Inout @@ -64,6 +67,7 @@ func New( MaxBackoff: config.MaxBackoff, BackoffFactor: config.BackoffFactor, Seek: config.Seek, + Matches: config.Matches, } state := states[reader.LocalSystemJournalID] @@ -81,6 +85,7 @@ func New( MaxBackoff: config.MaxBackoff, BackoffFactor: config.BackoffFactor, Seek: config.Seek, + Matches: config.Matches, } state := states[p] r, err := reader.New(cfg, done, state, logger) @@ -90,16 +95,24 @@ func New( readers = append(readers, r) } + processors, err := processors.New(config.Processors) + if err != nil { + return nil, err + } + logp.Info(">>> %v", config.EventMetadata) + logger.Debugf("New input is created for paths %v", config.Paths) return &Input{ - readers: readers, - done: done, - config: config, - pipeline: pipeline, - states: states, - id: id, - logger: logger, + readers: readers, + done: done, + config: config, + pipeline: pipeline, + states: states, + id: id, + logger: logger, + eventMeta: config.EventMetadata, + processors: processors, }, nil } @@ -108,9 +121,9 @@ func New( func (i *Input) Run() { client, err := i.pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, - EventMetadata: common.EventMetadata{}, + EventMetadata: i.eventMeta, Meta: nil, - Processor: nil, + Processor: i.processors, ACKCount: func(n int) { i.logger.Infof("journalbeat successfully published %d events", n) }, diff --git a/journalbeat/journalbeat.reference.yml b/journalbeat/journalbeat.reference.yml index 718208cb8119..27c12f0c4a29 100644 --- a/journalbeat/journalbeat.reference.yml +++ b/journalbeat/journalbeat.reference.yml @@ -28,6 +28,17 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: tail + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #include_matches: [] + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + #========================= Journalbeat global options ============================ #journalbeat: # Name of the registry file. If a relative path is used, it is considered relative to the @@ -44,6 +55,10 @@ journalbeat.inputs: # Position to start reading from all journal. Possible values: head, tail, cursor #seek: head + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] + #================================ General ====================================== # The name of the shipper that publishes the network data. It can be used to group @@ -161,8 +176,8 @@ journalbeat.inputs: # # event -> filter1 -> event1 -> filter2 ->event2 ... # -# The supported processors are drop_fields, drop_event, include_fields, and -# add_cloud_metadata. +# The supported processors are drop_fields, drop_event, include_fields, +# decode_json_fields, and add_cloud_metadata. # # For example, you can use the following processors to keep the fields that # contain CPU load percentages, but remove the fields that contain CPU ticks @@ -243,6 +258,24 @@ journalbeat.inputs: #- add_host_metadata: # netinfo.enabled: false # +# The following example enriches each event with process metadata using +# process IDs included in the event. +# +#processors: +#- add_process_metadata: +# match_pids: ["system.process.ppid"] +# target: system.process.parent +# +# The following example decodes fields containing JSON strings +# and replaces the strings with valid JSON objects. +# +#processors: +#- decode_json_fields: +# fields: ["field1", "field2", ...] +# process_array: false +# max_depth: 1 +# target: "" +# overwrite_keys: false #============================= Elastic Cloud ================================== diff --git a/journalbeat/journalbeat.yml b/journalbeat/journalbeat.yml index b1df03705bc6..617b70e3832f 100644 --- a/journalbeat/journalbeat.yml +++ b/journalbeat/journalbeat.yml @@ -28,6 +28,17 @@ journalbeat.inputs: # Position to start reading from journal. Valid values: head, tail, cursor seek: tail + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #include_matches: [] + + # Optional fields that you can specify to add additional information to the + # output. Fields can be scalar values, arrays, dictionaries, or any nested + # combination of these. + #fields: + # env: staging + + #========================= Journalbeat global options ============================ #journalbeat: # Name of the registry file. If a relative path is used, it is considered relative to the @@ -44,6 +55,10 @@ journalbeat.inputs: # Position to start reading from all journal. Possible values: head, tail, cursor #seek: head + # Exact matching for field values of events. + # Matching for nginx entries: "systemd.unit=nginx" + #matches: [] + #================================ General ===================================== # The name of the shipper that publishes the network data. It can be used to group diff --git a/journalbeat/reader/fields.go b/journalbeat/reader/fields.go index 23a82a0f0103..10ee1fbd63e5 100644 --- a/journalbeat/reader/fields.go +++ b/journalbeat/reader/fields.go @@ -21,42 +21,47 @@ import "github.com/coreos/go-systemd/sdjournal" var ( journaldEventFields = map[string]string{ - "COREDUMP_UNIT": "coredump.unit", - "COREDUMP_USER_UNIT": "coredump.user_unit", - "OBJECT_AUDIT_LOGINUID": "object.audit.login_uid", - "OBJECT_AUDIT_SESSION": "object.audit.session", - "OBJECT_CMDLINE": "object.cmd", - "OBJECT_COMM": "object.name", - "OBJECT_EXE": "object.executable", - "OBJECT_GID": "object.gid", - "OBJECT_PID": "object.pid", - "OBJECT_SYSTEMD_OWNER_UID": "object.systemd.owner_uid", - "OBJECT_SYSTEMD_SESSION": "object.systemd.session", - "OBJECT_SYSTEMD_UNIT": "object.systemd.unit", - "OBJECT_SYSTEMD_USER_UNIT": "object.systemd.user_unit", - "OBJECT_UID": "object.uid", - "_KERNEL_DEVICE": "kernel.device", - "_KERNEL_SUBSYSTEM": "kernel.subsystem", - "_SYSTEMD_INVOCATION_ID": "sytemd.invocation_id", - "_UDEV_DEVLINK": "kernel.device_symlinks", // TODO aggregate multiple elements - "_UDEV_DEVNODE": "kernel.device_node_path", - "_UDEV_SYSNAME": "kernel.device_name", + // provided by systemd journal + "COREDUMP_UNIT": "journald.coredump.unit", + "COREDUMP_USER_UNIT": "journald.coredump.user_unit", + "OBJECT_AUDIT_LOGINUID": "journald.object.audit.login_uid", + "OBJECT_AUDIT_SESSION": "journald.object.audit.session", + "OBJECT_CMDLINE": "journald.object.cmd", + "OBJECT_COMM": "journald.object.name", + "OBJECT_EXE": "journald.object.executable", + "OBJECT_GID": "journald.object.gid", + "OBJECT_PID": "journald.object.pid", + "OBJECT_SYSTEMD_OWNER_UID": "journald.object.systemd.owner_uid", + "OBJECT_SYSTEMD_SESSION": "journald.object.systemd.session", + "OBJECT_SYSTEMD_UNIT": "journald.object.systemd.unit", + "OBJECT_SYSTEMD_USER_UNIT": "journald.object.systemd.user_unit", + "OBJECT_UID": "journald.object.uid", + "_KERNEL_DEVICE": "journald.kernel.device", + "_KERNEL_SUBSYSTEM": "journald.kernel.subsystem", + "_SYSTEMD_INVOCATION_ID": "systemd.invocation_id", + "_SYSTEMD_USER_SLICE": "systemd.user_slice", + "_UDEV_DEVLINK": "journald.kernel.device_symlinks", // TODO aggregate multiple elements + "_UDEV_DEVNODE": "journald.kernel.device_node_path", + "_UDEV_SYSNAME": "journald.kernel.device_name", sdjournal.SD_JOURNAL_FIELD_AUDIT_LOGINUID: "process.audit.login_uid", sdjournal.SD_JOURNAL_FIELD_AUDIT_SESSION: "process.audit.session", sdjournal.SD_JOURNAL_FIELD_BOOT_ID: "host.boot_id", + sdjournal.SD_JOURNAL_FIELD_CAP_EFFECTIVE: "process.capabilites", sdjournal.SD_JOURNAL_FIELD_CMDLINE: "process.cmd", - sdjournal.SD_JOURNAL_FIELD_CODE_FILE: "code.file", - sdjournal.SD_JOURNAL_FIELD_CODE_FUNC: "code.func", - sdjournal.SD_JOURNAL_FIELD_CODE_LINE: "code.line", + sdjournal.SD_JOURNAL_FIELD_CODE_FILE: "journald.code.file", + sdjournal.SD_JOURNAL_FIELD_CODE_FUNC: "journald.code.func", + sdjournal.SD_JOURNAL_FIELD_CODE_LINE: "journald.code.line", sdjournal.SD_JOURNAL_FIELD_COMM: "process.name", sdjournal.SD_JOURNAL_FIELD_EXE: "process.executable", sdjournal.SD_JOURNAL_FIELD_GID: "process.uid", sdjournal.SD_JOURNAL_FIELD_HOSTNAME: "host.name", sdjournal.SD_JOURNAL_FIELD_MACHINE_ID: "host.id", + sdjournal.SD_JOURNAL_FIELD_MESSAGE: "message", sdjournal.SD_JOURNAL_FIELD_PID: "process.pid", sdjournal.SD_JOURNAL_FIELD_PRIORITY: "syslog.priority", sdjournal.SD_JOURNAL_FIELD_SYSLOG_FACILITY: "syslog.facility", sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: "syslog.identifier", + sdjournal.SD_JOURNAL_FIELD_SYSLOG_PID: "syslog.pid", sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP: "systemd.cgroup", sdjournal.SD_JOURNAL_FIELD_SYSTEMD_OWNER_UID: "systemd.owner_uid", sdjournal.SD_JOURNAL_FIELD_SYSTEMD_SESSION: "systemd.session", @@ -65,6 +70,18 @@ var ( sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "systemd.user_unit", sdjournal.SD_JOURNAL_FIELD_TRANSPORT: "systemd.transport", sdjournal.SD_JOURNAL_FIELD_UID: "process.uid", - sdjournal.SD_JOURNAL_FIELD_MESSAGE: "message", + + // docker journald fields from: https://docs.docker.com/config/containers/logging/journald/ + "CONTAINER_ID": "conatiner.id_truncated", + "CONTAINER_ID_FULL": "container.id", + "CONTAINER_NAME": "container.name", + "CONTAINER_TAG": "container.image.tag", + "CONTAINER_PARTIAL_MESSAGE": "container.partial", + + // dropped fields + sdjournal.SD_JOURNAL_FIELD_MONOTONIC_TIMESTAMP: "", // saved in the registry + sdjournal.SD_JOURNAL_FIELD_SOURCE_REALTIME_TIMESTAMP: "", // saved in the registry + sdjournal.SD_JOURNAL_FIELD_CURSOR: "", // saved in the registry + "_SOURCE_MONOTONIC_TIMESTAMP": "", // received timestamp stored in @timestamp } ) diff --git a/journalbeat/reader/journal.go b/journalbeat/reader/journal.go index 22aeb56a9246..7cfcb1802b85 100644 --- a/journalbeat/reader/journal.go +++ b/journalbeat/reader/journal.go @@ -18,14 +18,17 @@ package reader import ( + "fmt" "io" "os" + "strings" "time" "github.com/coreos/go-systemd/sdjournal" "github.com/pkg/errors" "github.com/elastic/beats/journalbeat/checkpoint" + "github.com/elastic/beats/journalbeat/cmd/instance" "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" @@ -50,6 +53,8 @@ type Config struct { Backoff time.Duration // BackoffFactor is the multiplier of Backoff. BackoffFactor int + // Matches store the key value pairs to match entries. + Matches []string } // Reader reads entries from journal(s). @@ -83,6 +88,11 @@ func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *lo } } + err = setupMatches(j, c.Matches) + if err != nil { + return nil, err + } + r := &Reader{ journal: j, changes: make(chan int), @@ -92,6 +102,8 @@ func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *lo } r.seek(state.Cursor) + instance.AddJournalToMonitor(c.Path, j) + r.logger.Debug("New journal is opened for reading") return r, nil @@ -105,9 +117,15 @@ func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logge return nil, errors.Wrap(err, "failed to open local journal") } + c.Path = LocalSystemJournalID logger = logger.With("path", "local") logger.Debug("New local journal is opened for reading") + err = setupMatches(j, c.Matches) + if err != nil { + return nil, err + } + r := &Reader{ journal: j, changes: make(chan int), @@ -116,9 +134,46 @@ func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logge logger: logger, } r.seek(state.Cursor) + + instance.AddJournalToMonitor(c.Path, j) + return r, nil } +func setupMatches(j *sdjournal.Journal, matches []string) error { + for _, m := range matches { + elems := strings.Split(m, "=") + if len(elems) != 2 { + return fmt.Errorf("invalid match format: %s", m) + } + + var p string + for journalKey, eventKey := range journaldEventFields { + if elems[0] == eventKey { + p = journalKey + "=" + elems[1] + } + } + + // pass custom fields as is + if p == "" { + p = m + } + + logp.Debug("journal", "Added matcher expression: %s", p) + + err := j.AddMatch(p) + if err != nil { + return fmt.Errorf("error adding match to journal %v", err) + } + + err = j.AddDisjunction() + if err != nil { + return fmt.Errorf("error adding disjunction to journal: %v", err) + } + } + return nil +} + // seek seeks to the position determined by the coniguration and cursor state. func (r *Reader) seek(cursor string) { if r.config.Seek == "cursor" { @@ -213,12 +268,23 @@ func (r *Reader) readUntilNotNull(entries chan<- *beat.Event) error { // toEvent creates a beat.Event from journal entries. func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { fields := common.MapStr{} - for journalKey, eventKey := range journaldEventFields { - if entry.Fields[journalKey] != "" { - fields.Put(eventKey, entry.Fields[journalKey]) + custom := common.MapStr{} + + for k, v := range entry.Fields { + if kk, ok := journaldEventFields[k]; !ok { + normalized := strings.ToLower(strings.TrimLeft(k, "_")) + custom.Put(normalized, v) + } else { + if isKept(kk) { + fields.Put(kk, v) + } } } + if len(custom) != 0 { + fields["custom"] = custom + } + state := checkpoint.JournalState{ Path: r.config.Path, Cursor: entry.Cursor, @@ -226,14 +292,21 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event { MonotonicTimestamp: entry.MonotonicTimestamp, } + fields["read_timestamp"] = time.Now() + receivedByJournal := time.Unix(0, int64(entry.RealtimeTimestamp)*1000) + event := beat.Event{ - Timestamp: time.Now(), + Timestamp: receivedByJournal, Fields: fields, Private: state, } return &event } +func isKept(key string) bool { + return key != "" +} + // stopOrWait waits for a journal event. func (r *Reader) stopOrWait() { select { @@ -260,5 +333,6 @@ func (r *Reader) wait() { // Close closes the underlying journal reader. func (r *Reader) Close() { + instance.StopMonitoringJournal(r.config.Path) r.journal.Close() } diff --git a/journalbeat/reader/journal_test.go b/journalbeat/reader/journal_test.go new file mode 100644 index 000000000000..2e7da74aa9a6 --- /dev/null +++ b/journalbeat/reader/journal_test.go @@ -0,0 +1,123 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package reader + +import ( + "reflect" + "testing" + + "github.com/coreos/go-systemd/sdjournal" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/journalbeat/checkpoint" + "github.com/elastic/beats/journalbeat/cmd/instance" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" +) + +type ToEventTestCase struct { + entry sdjournal.JournalEntry + expectedFields common.MapStr +} + +type SetupMatchesTestCase struct { + matches []string + expectError bool +} + +func TestToEvent(t *testing.T) { + tests := []ToEventTestCase{ + // field name from fields.go + ToEventTestCase{ + entry: sdjournal.JournalEntry{ + Fields: map[string]string{ + sdjournal.SD_JOURNAL_FIELD_BOOT_ID: "123456", + }, + }, + expectedFields: common.MapStr{ + "host": common.MapStr{ + "boot_id": "123456", + }, + }, + }, + // custom field + ToEventTestCase{ + entry: sdjournal.JournalEntry{ + Fields: map[string]string{ + "my_custom_field": "value", + }, + }, + expectedFields: common.MapStr{ + "custom": common.MapStr{ + "my_custom_field": "value", + }, + }, + }, + // dropped field + ToEventTestCase{ + entry: sdjournal.JournalEntry{ + Fields: map[string]string{ + "_SOURCE_MONOTONIC_TIMESTAMP": "value", + }, + }, + expectedFields: common.MapStr{}, + }, + } + + instance.SetupJournalMetrics() + r, err := NewLocal(Config{Path: "dummy.journal"}, nil, checkpoint.JournalState{}, logp.NewLogger("test")) + if err != nil { + t.Fatalf("error creating test journal: %v", err) + } + for _, test := range tests { + event := r.toEvent(&test.entry) + delete(event.Fields, "read_timestamp") + assert.True(t, reflect.DeepEqual(event.Fields, test.expectedFields)) + } +} + +func TestSetupMatches(t *testing.T) { + tests := []SetupMatchesTestCase{ + // correct filter expression + SetupMatchesTestCase{ + matches: []string{"systemd.unit=nginx"}, + expectError: false, + }, + // custom field + SetupMatchesTestCase{ + matches: []string{"_MY_CUSTOM_FIELD=value"}, + expectError: false, + }, + // incorrect separator + SetupMatchesTestCase{ + matches: []string{"systemd.unit~nginx"}, + expectError: true, + }, + } + journal, err := sdjournal.NewJournal() + if err != nil { + t.Fatalf("error while creating test journal: %v", err) + } + + for _, test := range tests { + err = setupMatches(journal, test.matches) + if err != nil && !test.expectError { + t.Errorf("unexpected outcome of setupMatches: error: '%v', expected error: %v", err, test.expectError) + } + } +} diff --git a/journalbeat/tests/system/config/journalbeat.yml.j2 b/journalbeat/tests/system/config/journalbeat.yml.j2 index e2cefdcb6d02..ca8cc1d862cf 100644 --- a/journalbeat/tests/system/config/journalbeat.yml.j2 +++ b/journalbeat/tests/system/config/journalbeat.yml.j2 @@ -2,6 +2,7 @@ journalbeat.inputs: - paths: [{{ journal_path }}] seek: {{ seek_method }} + matches: [{{ matches }}] journalbeat.registry: {{ registry_file }} diff --git a/journalbeat/tests/system/test_base.py b/journalbeat/tests/system/test_base.py index 7b7d16aefb4e..be68e6c08c3b 100644 --- a/journalbeat/tests/system/test_base.py +++ b/journalbeat/tests/system/test_base.py @@ -109,6 +109,39 @@ def test_read_events_with_existing_registry(self): exit_code = journalbeat_proc.kill_and_wait() assert exit_code == 0 + @unittest.skipUnless(sys.platform.startswith("linux"), "Journald only on Linux") + def test_read_events_with_existing_registry(self): + """ + Journalbeat is able to pass matchers to the journal reader and read filtered messages. + """ + + self.render_config_template( + journal_path=self.beat_path + "/tests/system/input/test.journal", + seek_method="head", + matches="syslog.priority=5", + path=os.path.abspath(self.working_dir) + "/log/*", + ) + journalbeat_proc = self.start_beat() + + required_log_snippets = [ + # journalbeat can be started + "journalbeat is running", + # journalbeat can seek to the position defined in the cursor + "Added matcher expression", + # message can be read from test journal + "unhandled HKEY event 0x60b0", + "please report the conditions when this event happened to", + "unhandled HKEY event 0x60b1", + # Four events with priority 5 is publised + "journalbeat successfully published 4 events", + ] + for snippet in required_log_snippets: + self.wait_until(lambda: self.log_contains(snippet), + name="Line in '{}' Journalbeat log".format(snippet)) + + exit_code = journalbeat_proc.kill_and_wait() + assert exit_code == 0 + if __name__ == '__main__': unittest.main()