Skip to content

Commit

Permalink
convert string fields to int && drop if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Oct 16, 2018
1 parent f924878 commit d588faa
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 66 deletions.
122 changes: 64 additions & 58 deletions journalbeat/reader/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,69 +19,75 @@ package reader

import "github.com/coreos/go-systemd/sdjournal"

type FieldConversion struct {
Name string
IsInteger bool
Dropped bool
}

var (
journaldEventFields = map[string]string{
journaldEventFields = map[string]FieldConversion{
// provided by systemd journal
"COREDUMP_UNIT": "journald.coredump.unit",
"COREDUMP_USER_UNIT": "journald.coredump.user_unit",
"OBJECT_AUDIT_LOGINUID": "journald.object.audit.login_uid",
"OBJECT_AUDIT_SESSION": "journald.object.audit.session",
"OBJECT_CMDLINE": "journald.object.cmd",
"OBJECT_COMM": "journald.object.name",
"OBJECT_EXE": "journald.object.executable",
"OBJECT_GID": "journald.object.gid",
"OBJECT_PID": "journald.object.pid",
"OBJECT_SYSTEMD_OWNER_UID": "journald.object.systemd.owner_uid",
"OBJECT_SYSTEMD_SESSION": "journald.object.systemd.session",
"OBJECT_SYSTEMD_UNIT": "journald.object.systemd.unit",
"OBJECT_SYSTEMD_USER_UNIT": "journald.object.systemd.user_unit",
"OBJECT_UID": "journald.object.uid",
"_KERNEL_DEVICE": "journald.kernel.device",
"_KERNEL_SUBSYSTEM": "journald.kernel.subsystem",
"_SYSTEMD_INVOCATION_ID": "systemd.invocation_id",
"_SYSTEMD_USER_SLICE": "systemd.user_slice",
"_UDEV_DEVLINK": "journald.kernel.device_symlinks", // TODO aggregate multiple elements
"_UDEV_DEVNODE": "journald.kernel.device_node_path",
"_UDEV_SYSNAME": "journald.kernel.device_name",
sdjournal.SD_JOURNAL_FIELD_AUDIT_LOGINUID: "process.audit.login_uid",
sdjournal.SD_JOURNAL_FIELD_AUDIT_SESSION: "process.audit.session",
sdjournal.SD_JOURNAL_FIELD_BOOT_ID: "host.boot_id",
sdjournal.SD_JOURNAL_FIELD_CAP_EFFECTIVE: "process.capabilites",
sdjournal.SD_JOURNAL_FIELD_CMDLINE: "process.cmd",
sdjournal.SD_JOURNAL_FIELD_CODE_FILE: "journald.code.file",
sdjournal.SD_JOURNAL_FIELD_CODE_FUNC: "journald.code.func",
sdjournal.SD_JOURNAL_FIELD_CODE_LINE: "journald.code.line",
sdjournal.SD_JOURNAL_FIELD_COMM: "process.name",
sdjournal.SD_JOURNAL_FIELD_EXE: "process.executable",
sdjournal.SD_JOURNAL_FIELD_GID: "process.uid",
sdjournal.SD_JOURNAL_FIELD_HOSTNAME: "host.name",
sdjournal.SD_JOURNAL_FIELD_MACHINE_ID: "host.id",
sdjournal.SD_JOURNAL_FIELD_MESSAGE: "message",
sdjournal.SD_JOURNAL_FIELD_PID: "process.pid",
sdjournal.SD_JOURNAL_FIELD_PRIORITY: "syslog.priority",
sdjournal.SD_JOURNAL_FIELD_SYSLOG_FACILITY: "syslog.facility",
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: "syslog.identifier",
sdjournal.SD_JOURNAL_FIELD_SYSLOG_PID: "syslog.pid",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP: "systemd.cgroup",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_OWNER_UID: "systemd.owner_uid",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_SESSION: "systemd.session",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_SLICE: "systemd.slice",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: "systemd.unit",
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: "systemd.user_unit",
sdjournal.SD_JOURNAL_FIELD_TRANSPORT: "systemd.transport",
sdjournal.SD_JOURNAL_FIELD_UID: "process.uid",
"COREDUMP_UNIT": FieldConversion{"journald.coredump.unit", false, false},
"COREDUMP_USER_UNIT": FieldConversion{"journald.coredump.user_unit", false, false},
"OBJECT_AUDIT_LOGINUID": FieldConversion{"journald.object.audit.login_uid", true, false},
"OBJECT_AUDIT_SESSION": FieldConversion{"journald.object.audit.session", true, false},
"OBJECT_CMDLINE": FieldConversion{"journald.object.cmd", false, false},
"OBJECT_COMM": FieldConversion{"journald.object.name", false, false},
"OBJECT_EXE": FieldConversion{"journald.object.executable", false, false},
"OBJECT_GID": FieldConversion{"journald.object.gid", true, false},
"OBJECT_PID": FieldConversion{"journald.object.pid", true, false},
"OBJECT_SYSTEMD_OWNER_UID": FieldConversion{"journald.object.systemd.owner_uid", true, false},
"OBJECT_SYSTEMD_SESSION": FieldConversion{"journald.object.systemd.session", false, false},
"OBJECT_SYSTEMD_UNIT": FieldConversion{"journald.object.systemd.unit", false, false},
"OBJECT_SYSTEMD_USER_UNIT": FieldConversion{"journald.object.systemd.user_unit", false, false},
"OBJECT_UID": FieldConversion{"journald.object.uid", true, false},
"_KERNEL_DEVICE": FieldConversion{"journald.kernel.device", false, false},
"_KERNEL_SUBSYSTEM": FieldConversion{"journald.kernel.subsystem", false, false},
"_SYSTEMD_INVOCATION_ID": FieldConversion{"systemd.invocation_id", false, false},
"_SYSTEMD_USER_SLICE": FieldConversion{"systemd.user_slice", false, false},
"_UDEV_DEVLINK": FieldConversion{"journald.kernel.device_symlinks", false, false}, // TODO aggregate multiple elements
"_UDEV_DEVNODE": FieldConversion{"journald.kernel.device_node_path", false, false},
"_UDEV_SYSNAME": FieldConversion{"journald.kernel.device_name", false, false},
sdjournal.SD_JOURNAL_FIELD_AUDIT_LOGINUID: FieldConversion{"process.audit.login_uid", true, false},
sdjournal.SD_JOURNAL_FIELD_AUDIT_SESSION: FieldConversion{"process.audit.session", false, false},
sdjournal.SD_JOURNAL_FIELD_BOOT_ID: FieldConversion{"host.boot_id", false, false},
sdjournal.SD_JOURNAL_FIELD_CAP_EFFECTIVE: FieldConversion{"process.capabilites", false, false},
sdjournal.SD_JOURNAL_FIELD_CMDLINE: FieldConversion{"process.cmd", false, false},
sdjournal.SD_JOURNAL_FIELD_CODE_FILE: FieldConversion{"journald.code.file", false, false},
sdjournal.SD_JOURNAL_FIELD_CODE_FUNC: FieldConversion{"journald.code.func", false, false},
sdjournal.SD_JOURNAL_FIELD_CODE_LINE: FieldConversion{"journald.code.line", true, false},
sdjournal.SD_JOURNAL_FIELD_COMM: FieldConversion{"process.name", false, false},
sdjournal.SD_JOURNAL_FIELD_EXE: FieldConversion{"process.executable", false, false},
sdjournal.SD_JOURNAL_FIELD_GID: FieldConversion{"process.uid", true, false},
sdjournal.SD_JOURNAL_FIELD_HOSTNAME: FieldConversion{"host.name", false, false},
sdjournal.SD_JOURNAL_FIELD_MACHINE_ID: FieldConversion{"host.id", true, false},
sdjournal.SD_JOURNAL_FIELD_MESSAGE: FieldConversion{"message", false, false},
sdjournal.SD_JOURNAL_FIELD_PID: FieldConversion{"process.pid", true, false},
sdjournal.SD_JOURNAL_FIELD_PRIORITY: FieldConversion{"syslog.priority", true, false},
sdjournal.SD_JOURNAL_FIELD_SYSLOG_FACILITY: FieldConversion{"syslog.facility", true, false},
sdjournal.SD_JOURNAL_FIELD_SYSLOG_IDENTIFIER: FieldConversion{"syslog.identifier", false, false},
sdjournal.SD_JOURNAL_FIELD_SYSLOG_PID: FieldConversion{"syslog.pid", true, false},
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_CGROUP: FieldConversion{"systemd.cgroup", false, false},
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_OWNER_UID: FieldConversion{"systemd.owner_uid", true, false},
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_SESSION: FieldConversion{"systemd.session", false, false},
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_SLICE: FieldConversion{"systemd.slice", false, false},
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_UNIT: FieldConversion{"systemd.unit", false, false},
sdjournal.SD_JOURNAL_FIELD_SYSTEMD_USER_UNIT: FieldConversion{"systemd.user_unit", false, false},
sdjournal.SD_JOURNAL_FIELD_TRANSPORT: FieldConversion{"systemd.transport", false, false},
sdjournal.SD_JOURNAL_FIELD_UID: FieldConversion{"process.uid", true, false},

// docker journald fields from: https://docs.docker.com/config/containers/logging/journald/
"CONTAINER_ID": "conatiner.id_truncated",
"CONTAINER_ID_FULL": "container.id",
"CONTAINER_NAME": "container.name",
"CONTAINER_TAG": "container.image.tag",
"CONTAINER_PARTIAL_MESSAGE": "container.partial",
"CONTAINER_ID": FieldConversion{"conatiner.id_truncated", false, false},
"CONTAINER_ID_FULL": FieldConversion{"container.id", false, false},
"CONTAINER_NAME": FieldConversion{"container.name", false, false},
"CONTAINER_TAG": FieldConversion{"container.image.tag", false, false},
"CONTAINER_PARTIAL_MESSAGE": FieldConversion{"container.partial", false, false},

// dropped fields
sdjournal.SD_JOURNAL_FIELD_MONOTONIC_TIMESTAMP: "", // saved in the registry
sdjournal.SD_JOURNAL_FIELD_SOURCE_REALTIME_TIMESTAMP: "", // saved in the registry
sdjournal.SD_JOURNAL_FIELD_CURSOR: "", // saved in the registry
"_SOURCE_MONOTONIC_TIMESTAMP": "", // received timestamp stored in @timestamp
sdjournal.SD_JOURNAL_FIELD_MONOTONIC_TIMESTAMP: FieldConversion{"", false, true}, // saved in the registry
sdjournal.SD_JOURNAL_FIELD_SOURCE_REALTIME_TIMESTAMP: FieldConversion{"", false, true}, // saved in the registry
sdjournal.SD_JOURNAL_FIELD_CURSOR: FieldConversion{"", false, true}, // saved in the registry
"_SOURCE_MONOTONIC_TIMESTAMP": FieldConversion{"", false, true}, // received timestamp stored in @timestamp
}
)
23 changes: 15 additions & 8 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -147,8 +148,8 @@ func setupMatches(j *sdjournal.Journal, matches []string) error {
}

var p string
for journalKey, eventKey := range journaldEventFields {
if elems[0] == eventKey {
for journalKey, eventField := range journaldEventFields {
if elems[0] == eventField.Name {
p = journalKey + "=" + elems[1]
}
}
Expand Down Expand Up @@ -248,8 +249,18 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event {
normalized := strings.ToLower(strings.TrimLeft(k, "_"))
custom.Put(normalized, v)
} else {
if isKept(kk) {
fields.Put(kk, v)
if !kk.Dropped {
if kk.IsInteger {
vv, err := strconv.ParseInt(v, 10, 64)
if err != nil {
r.logger.Debug("Failed to convert field: %s %v to int: %v", kk.Name, v, err)
fields.Put(kk.Name, v)
continue
}
fields.Put(kk.Name, vv)
} else {
fields.Put(kk.Name, v)
}
}
}
}
Expand All @@ -276,10 +287,6 @@ func (r *Reader) toEvent(entry *sdjournal.JournalEntry) *beat.Event {
return &event
}

func isKept(key string) bool {
return key != ""
}

func (r *Reader) wait() {
select {
case <-r.done:
Expand Down

0 comments on commit d588faa

Please sign in to comment.