Skip to content

Commit

Permalink
Journald support for System module (#41555)
Browse files Browse the repository at this point in the history
This commit adds journald support for the System module, both filesets now have a `use_journald` variable
that can be set to force using Journald to ingest syslog and auth logs.

The ingest pipelines are updated, now there is an entrypoint pipeline that selects the correct one according to the field
`input.type`.

System tests are also added.
  • Loading branch information
belimawr authored Nov 18, 2024
1 parent 18e256f commit f4b80fd
Show file tree
Hide file tree
Showing 28 changed files with 817 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Make ETW input GA. {pull}41389[41389]
- Added input metrics to GCS input. {issue}36640[36640] {pull}41505[41505]
- Add support for Okta entity analytics provider to collect role and factor data for users. {pull}41460[41460]
- Add support for Journald in the System module. {pull}41555[41555]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]

Expand Down
4 changes: 4 additions & 0 deletions filebeat/docs/include/use-journald.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*`var.use_journald`*::

A boolean that when set to `true` will read logs from Journald. When
Journald is used all events contain the tag `journald`.
2 changes: 2 additions & 0 deletions filebeat/docs/modules/system.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ include::../include/config-option-intro.asciidoc[]
==== `syslog` fileset settings

include::../include/var-paths.asciidoc[]
include::../include/use-journald.asciidoc[]

[float]
==== `auth` fileset settings

include::../include/var-paths.asciidoc[]
include::../include/use-journald.asciidoc[]

*`var.tags`*::

Expand Down
6 changes: 6 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ filebeat.modules:
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Use journald to collect system logs
#var.use_journald: false

# Input configuration (advanced). Any input configuration option
# can be added under this section.
#input:
Expand All @@ -33,6 +36,9 @@ filebeat.modules:
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Use journald to collect auth logs
#var.use_journald: false

# Input configuration (advanced). Any input configuration option
# can be added under this section.
#input:
Expand Down
14 changes: 14 additions & 0 deletions filebeat/module/system/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Journald tests (Debian 12)
The tests for the journald input (currently only used for Debian 12
testing) require journal files (test files ending in `.journal`), those
files are generated using `systemd-journal-remote` (see the [Journald
input README.md](../../input/journald/README.md) for more details).

The source for those journal files are the `.export` files in the test
folder. Those files are the raw output of `journalctl -o export`. They
are added here because journal files format change with different
versions of journald, which can cause `journalclt` to fail reading
them, which leads to test failures. So if tests start failing because
`journalctl` cannot read the journal files as expected, new ones can
easily be generated with the same version of journalctl used on CI
and the original dataset.
6 changes: 6 additions & 0 deletions filebeat/module/system/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Use journald to collect system logs
#var.use_journald: false

# Input configuration (advanced). Any input configuration option
# can be added under this section.
#input:
Expand All @@ -19,6 +22,9 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Use journald to collect auth logs
#var.use_journald: false

# Input configuration (advanced). Any input configuration option
# can be added under this section.
#input:
6 changes: 6 additions & 0 deletions filebeat/module/system/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Use journald to collect system logs
#var.use_journald: false

# Authorization logs
auth:
enabled: false

# Set custom paths for the log files. If left empty,
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Use journald to collect auth logs
#var.use_journald: false
2 changes: 2 additions & 0 deletions filebeat/module/system/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ include::../include/config-option-intro.asciidoc[]
==== `syslog` fileset settings

include::../include/var-paths.asciidoc[]
include::../include/use-journald.asciidoc[]

[float]
==== `auth` fileset settings

include::../include/var-paths.asciidoc[]
include::../include/use-journald.asciidoc[]

*`var.tags`*::

Expand Down
10 changes: 9 additions & 1 deletion filebeat/module/system/auth/config/auth.yml
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
{{ if .use_journald }}
type: journald
id: system-auth
facilities:
- 4
- 10
{{ else }}
type: log
paths:
{{ range $i, $path := .paths }}
- {{$path}}
{{ end }}
exclude_files: [".gz$"]

multiline:
pattern: "^\\s"
match: after
{{ end }}

# Common configuration
processors:
- add_locale: ~

Expand Down
9 changes: 9 additions & 0 deletions filebeat/module/system/auth/ingest/entrypoint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
description: Entrypoint Pipeline for system/auth Filebeat module
processors:
- pipeline:
if: ctx?.input?.type == "journald"
name: '{< IngestPipeline "journald" >}'

- pipeline:
if: ctx?.input?.type == "log"
name: '{< IngestPipeline "files" >}'
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,9 @@ processors:
TIMESTAMP: (?:%{TIMESTAMP_ISO8601}|%{SYSLOGTIMESTAMP})
patterns:
- '^%{TIMESTAMP:system.auth.timestamp} %{SYSLOGHOST:host.hostname}? %{DATA:process.name}(?:\[%{POSINT:process.pid:long}\])?:%{SPACE}%{GREEDYMULTILINE:_temp.message}$'
- grok:
- pipeline:
description: Grok specific auth messages.
tag: grok-specific-messages
field: _temp.message
ignore_missing: true
patterns:
- '^%{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user)?%{DATA:user.name} from %{IPORHOST:source.address} port %{NUMBER:source.port:long} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?'
- '^%{DATA:system.auth.ssh.event} user %{DATA:user.name} from %{IPORHOST:source.address}'
- '^Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}'
- '^%{DATA:user.name} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}'
- '^new group: name=%{DATA:group.name}, GID=%{NUMBER:group.id}'
- '^new user: name=%{DATA:user.name}, UID=%{NUMBER:user.id}, GID=%{NUMBER:group.id}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$'
name: '{< IngestPipeline "grok-auth-messages" >}'
on_failure:
- rename:
description: Leave the unmatched content in message.
Expand Down
14 changes: 14 additions & 0 deletions filebeat/module/system/auth/ingest/grok-auth-messages.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
description: Journald Pipeline for system/auth Filebeat module
processors:
- grok:
description: Grok specific auth messages.
tag: grok-specific-messages
field: _temp.message
ignore_missing: true
patterns:
- '^%{DATA:system.auth.ssh.event} %{DATA:system.auth.ssh.method} for (invalid user)?%{DATA:user.name} from %{IPORHOST:source.address} port %{NUMBER:source.port:long} ssh2(: %{GREEDYDATA:system.auth.ssh.signature})?'
- '^%{DATA:system.auth.ssh.event} user %{DATA:user.name} from %{IPORHOST:source.address}'
- '^Did not receive identification string from %{IPORHOST:system.auth.ssh.dropped_ip}'
- '^%{DATA:user.name} :( %{DATA:system.auth.sudo.error} ;)? TTY=%{DATA:system.auth.sudo.tty} ; PWD=%{DATA:system.auth.sudo.pwd} ; USER=%{DATA:system.auth.sudo.user} ; COMMAND=%{GREEDYDATA:system.auth.sudo.command}'
- '^new group: name=%{DATA:group.name}, GID=%{NUMBER:group.id}'
- '^new user: name=%{DATA:user.name}, UID=%{NUMBER:user.id}, GID=%{NUMBER:group.id}, home=%{DATA:system.auth.useradd.home}, shell=%{DATA:system.auth.useradd.shell}$'
205 changes: 205 additions & 0 deletions filebeat/module/system/auth/ingest/journald.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
description: Journald Pipeline for system/auth Filebeat module
processors:
- set:
field: event.ingested
copy_from: _ingest.timestamp
- rename:
field: "journald.process.name"
target_field: process.name
- set:
field: "process.pid"
copy_from: "journald.pid"
ignore_failure: true
- rename:
field: message
target_field: _temp.message
- pipeline:
description: Grok specific auth messages.
name: '{< IngestPipeline "grok-auth-messages" >}'
ignore_failure: true
- rename:
field: _temp.message
target_field: message
- grok:
description: Grok usernames from PAM messages.
tag: grok-pam-users
field: message
ignore_missing: true
ignore_failure: true
patterns:
- 'for user %{QUOTE}?%{DATA:_temp.foruser}%{QUOTE}? by %{QUOTE}?%{DATA:_temp.byuser}%{QUOTE}?(?:\(uid=%{NUMBER:_temp.byuid}\))?$'
- 'for user %{QUOTE}?%{DATA:_temp.foruser}%{QUOTE}?$'
- 'by user %{QUOTE}?%{DATA:_temp.byuser}%{QUOTE}?$'
- '%{BOUNDARY} user %{QUOTE}%{DATA:_temp.user}%{QUOTE}'
pattern_definitions:
QUOTE: "['\"]"
BOUNDARY: "(?<! )"
if: ctx.message != null && ctx.message != ""
- rename:
field: _temp.byuser
target_field: user.name
ignore_missing: true
ignore_failure: true
- rename:
field: _temp.byuid
target_field: user.id
ignore_missing: true
ignore_failure: true
- rename:
field: _temp.foruser
target_field: user.name
ignore_missing: true
ignore_failure: true
if: ctx.user?.name == null || ctx.user?.name == ""
- rename:
field: _temp.user
target_field: user.name
ignore_missing: true
ignore_failure: true
if: ctx.user?.name == null || ctx.user?.name == ""
- rename:
field: _temp.foruser
target_field: user.effective.name
ignore_missing: true
ignore_failure: true
if: ctx.user?.name != null
- remove:
field: _temp
ignore_missing: true
- convert:
field: source.address
target_field: source.ip
type: ip
ignore_missing: true
on_failure:
- set:
field: source.domain
copy_from: source.address
ignore_failure: true
- convert:
field: system.auth.sudo.user
target_field: user.effective.name
type: string
ignore_failure: true
if: ctx.system?.auth?.sudo?.user != null
- convert:
field: system.auth.ssh.dropped_ip
target_field: source.ip
type: ip
ignore_missing: true
on_failure:
- remove:
field: system.auth.ssh.dropped_ip
- geoip:
field: source.ip
target_field: source.geo
ignore_missing: true
- geoip:
database_file: GeoLite2-ASN.mmdb
field: source.ip
target_field: source.as
properties:
- asn
- organization_name
ignore_missing: true
- rename:
field: source.as.asn
target_field: source.as.number
ignore_missing: true
- rename:
field: source.as.organization_name
target_field: source.as.organization.name
ignore_missing: true
- set:
field: event.kind
value: event
- script:
description: Add event.category/action/output to SSH events.
tag: script-categorize-ssh-event
if: ctx.system?.auth?.ssh?.event != null
lang: painless
source: >-
if (ctx.system.auth.ssh.event == "Accepted") {
ctx.event.type = ["info"];
ctx.event.category = ["authentication", "session"];
ctx.event.action = "ssh_login";
ctx.event.outcome = "success";
} else if (ctx.system.auth.ssh.event == "Invalid" || ctx.system.auth.ssh.event == "Failed") {
ctx.event.type = ["info"];
ctx.event.category = ["authentication"];
ctx.event.action = "ssh_login";
ctx.event.outcome = "failure";
}
- append:
field: event.category
value: iam
if: ctx.process?.name != null && ['groupadd', 'groupdel', 'groupmod', 'useradd', 'userdel', 'usermod'].contains(ctx.process.name)
- set:
field: event.outcome
value: success
if: ctx.process?.name != null && (ctx.message == null || !ctx.message.contains("fail")) && ['groupadd', 'groupdel', 'groupmod', 'useradd', 'userdel', 'usermod'].contains(ctx.process.name)
- set:
field: event.outcome
value: failure
if: ctx.process?.name != null && (ctx.message != null && ctx.message.contains("fail")) && ['groupadd', 'groupdel', 'groupmod', 'useradd', 'userdel', 'usermod'].contains(ctx.process.name)
- append:
field: event.type
value: user
if: ctx.process?.name != null && ['useradd', 'userdel', 'usermod'].contains(ctx.process.name)
- append:
field: event.type
value: group
if: ctx.process?.name != null && ['groupadd', 'groupdel', 'groupmod'].contains(ctx.process.name)
- append:
field: event.type
value: creation
if: ctx.process?.name != null && ['useradd', 'groupadd'].contains(ctx.process.name)
- append:
field: event.type
value: deletion
if: ctx.process?.name != null && ['userdel', 'groupdel'].contains(ctx.process.name)
- append:
field: event.type
value: change
if: ctx.process?.name != null && ['usermod', 'groupmod'].contains(ctx.process.name)
- append:
field: related.user
value: "{{{ user.name }}}"
allow_duplicates: false
if: ctx.user?.name != null && ctx.user?.name != ''
- append:
field: related.user
value: "{{{ user.effective.name }}}"
allow_duplicates: false
if: ctx.user?.effective?.name != null && ctx.user?.effective?.name != ''
- append:
field: related.ip
value: "{{{ source.ip }}}"
allow_duplicates: false
if: ctx.source?.ip != null && ctx.source?.ip != ''
- append:
field: related.hosts
value: "{{{ host.hostname }}}"
allow_duplicates: false
if: ctx.host?.hostname != null && ctx.host?.hostname != ''
- set:
field: ecs.version
value: 8.0.0
- remove:
field: event.original
if: "ctx?.tags == null || !(ctx.tags.contains('preserve_original_event'))"
ignore_failure: true
ignore_missing: true
- remove:
description: Remove the extra fields added by the Journald input
ignore_missing: true
field:
- journald
- process.thread
- syslog
- systemd
- message_id
on_failure:
- set:
field: error.message
value: '{{{ _ingest.on_failure_message }}}'
Loading

0 comments on commit f4b80fd

Please sign in to comment.