Skip to content

Commit

Permalink
New decode_csv_fields processor (#11753)
Browse files Browse the repository at this point in the history
This patch introduces a new processor, `decode_csv_fields` that decodes
rows of CSV-formatted data into a string array, one element per column.

processors:
- decode_csv_fields:
    fields:
      message: csv
    separator: ,
    overwrite_keys: false
    ignore_missing: false
    trim_leading_space: false
  • Loading branch information
adriansr authored Apr 26, 2019
1 parent 50f6315 commit e03993f
Show file tree
Hide file tree
Showing 19 changed files with 669 additions and 38 deletions.
1 change: 1 addition & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (

// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata"
_ "github.com/elastic/beats/libbeat/processors/decode_csv_fields"
)

const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" +
Expand Down
55 changes: 53 additions & 2 deletions filebeat/tests/system/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,57 @@ def test_truncate_characters(self):
u"This is OK",
])

def test_decode_csv_fields_defaults(self):
"""
Check CSV decoding using defaults
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
processors=[{
"decode_csv_fields": {
"fields": {
"message": "csv"
}
},
}]
)

self._init_and_read_test_input([
u"42,\"string with \"\"quotes\"\"\"\n",
u",\n"
])

self._assert_expected_lines([
["42", "string with \"quotes\""],
["", ""]
], field="csv")

def test_decode_csv_fields_all_options(self):
"""
Check CSV decoding with options
"""
self.render_config_template(
path=os.path.abspath(self.working_dir) + "/test.log",
processors=[{
"decode_csv_fields": {
"fields": {
"message": "message"
},
"overwrite_keys": True,
"separator": "\"\t\"",
"trim_leading_space": True,
},
}]
)

self._init_and_read_test_input([
u" 42\t hello world\t \"string\twith tabs and \"broken\" quotes\"\n",
])

self._assert_expected_lines([
["42", "hello world", "string\twith tabs and \"broken\" quotes"],
])

def _init_and_read_test_input(self, input_lines):
with io.open(self.working_dir + "/test.log", "w", encoding="utf-8") as f:
for line in input_lines:
Expand All @@ -258,10 +309,10 @@ def _init_and_read_test_input(self, input_lines):
self.wait_until(lambda: self.output_has(lines=len(input_lines)))
filebeat.check_kill_and_wait()

def _assert_expected_lines(self, expected_lines):
def _assert_expected_lines(self, expected_lines, field="message"):
output = self.read_output()

assert len(output) == len(expected_lines)

for i in range(len(expected_lines)):
assert output[i]["message"] == expected_lines[i]
assert output[i][field] == expected_lines[i]
3 changes: 3 additions & 0 deletions journalbeat/beater/journalbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (

"github.com/elastic/beats/journalbeat/config"
_ "github.com/elastic/beats/journalbeat/include"

// Add dedicated processors
_ "github.com/elastic/beats/libbeat/processors/decode_csv_fields"
)

// Journalbeat instance
Expand Down
48 changes: 48 additions & 0 deletions libbeat/docs/processors-using.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ The supported processors are:
* <<add-process-metadata,`add_process_metadata`>>
* <<add-tags, `add_tags`>>
* <<community-id,`community_id`>>
ifeval::[("{beatname_lc}"=="filebeat") or ("{beatname_lc}"=="journalbeat")]
* <<decode-csv-fields,`decode_csv_fields`>>
endif::[]
* <<decode-json-fields,`decode_json_fields`>>
* <<dissect, `dissect`>>
* <<processor-dns, `dns`>>
Expand Down Expand Up @@ -776,6 +779,51 @@ Adds the environment field to every event:
}
-------------------------------------------------------------------------------

ifeval::[("{beatname_lc}"=="filebeat") or ("{beatname_lc}"=="journalbeat")]
* <<decode-csv-fields,`decode_csv_fields`>>
[[decode-csv-fields]]
=== Decode CSV fields

experimental[]

The `decode_csv_fields` processor decodes fields containing records in
comma-separated format (CSV). It will output the values as an array of strings.
This processor is available for Filebeat and Journalbeat.

[source,yaml]
-----------------------------------------------------
processors:
- decode_csv_fields:
fields:
message: decoded.csv
separator: ,
ignore_missing: false
overwrite_keys: true
trim_leading_whitespace: false
fail_on_error: true
-----------------------------------------------------

The `decode_csv_fields` has the following settings:

`fields`:: This is a mapping from the source field containing the CSV data to
the destination field where the decoded array will be written.
`separator`:: (Optional) Character to be used as a column separator.
The default is the comma character. For using a TAB character you
must set it to "\t".
`ignore_missing`:: (Optional) Whether to ignore events which lack the source
field. The default is false, which will fail processing of an
event if `target` field is missing.
`overwrite_keys`:: Whether the target field is overwritten if it
already exists. The default is false, which will fail
processing of an event when `target` already exists.
`trim_leading_space`:: Whether extra space after the separator is trimmed from
values. This works even if the separator is also a space.
The default is false.
`fail_on_error`:: (Optional) If set to true, in case of an error the changes to
the event are reverted and the original event is returned. If set to false,
processing continues also if an error happened during renaming. Default is `true`.

endif::[]

[[decode-json-fields]]
=== Decode JSON fields
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/add_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type addFields struct {
Expand All @@ -36,9 +37,9 @@ const FieldsKey = "fields"

func init() {
processors.RegisterPlugin("add_fields",
configChecked(CreateAddFields,
requireFields(FieldsKey),
allowedFields(FieldsKey, "target", "when")))
checks.ConfigChecked(CreateAddFields,
checks.RequireFields(FieldsKey),
checks.AllowedFields(FieldsKey, "target", "when")))
}

// CreateAddFields constructs an add_fields processor from config.
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/add_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

// LabelsKey is the default target key for the add_labels processor.
const LabelsKey = "labels"

func init() {
processors.RegisterPlugin("add_labels",
configChecked(createAddLabels,
requireFields(LabelsKey),
allowedFields(LabelsKey, "when")))
checks.ConfigChecked(createAddLabels,
checks.RequireFields(LabelsKey),
checks.AllowedFields(LabelsKey, "when")))
}

func createAddLabels(c *common.Config) (processors.Processor, error) {
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/add_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type addTags struct {
Expand All @@ -33,9 +34,9 @@ type addTags struct {

func init() {
processors.RegisterPlugin("add_tags",
configChecked(createAddTags,
requireFields("tags"),
allowedFields("tags", "target", "when")))
checks.ConfigChecked(createAddTags,
checks.RequireFields("tags"),
checks.AllowedFields("tags", "target", "when")))
}

func createAddTags(c *common.Config) (processors.Processor, error) {
Expand Down
5 changes: 3 additions & 2 deletions libbeat/processors/actions/copy_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type copyFields struct {
Expand All @@ -40,8 +41,8 @@ type copyFieldsConfig struct {

func init() {
processors.RegisterPlugin("copy_fields",
configChecked(NewCopyFields,
requireFields("fields"),
checks.ConfigChecked(NewCopyFields,
checks.RequireFields("fields"),
),
)
}
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/decode_json_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/elastic/beats/libbeat/common/jsontransform"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type decodeJSONFields struct {
Expand Down Expand Up @@ -60,9 +61,9 @@ var debug = logp.MakeDebug("filters")

func init() {
processors.RegisterPlugin("decode_json_fields",
configChecked(NewDecodeJSONFields,
requireFields("fields"),
allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when")))
checks.ConfigChecked(NewDecodeJSONFields,
checks.RequireFields("fields"),
checks.AllowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when")))
}

// NewDecodeJSONFields construct a new decode_json_fields processor.
Expand Down
3 changes: 2 additions & 1 deletion libbeat/processors/actions/drop_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type dropEvent struct{}

func init() {
processors.RegisterPlugin("drop_event",
configChecked(newDropEvent, allowedFields("when")))
checks.ConfigChecked(newDropEvent, checks.AllowedFields("when")))
}

var dropEventsSingleton = (*dropEvent)(nil)
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/drop_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type dropFields struct {
Expand All @@ -32,9 +33,9 @@ type dropFields struct {

func init() {
processors.RegisterPlugin("drop_fields",
configChecked(newDropFields,
requireFields("fields"),
allowedFields("fields", "when")))
checks.ConfigChecked(newDropFields,
checks.RequireFields("fields"),
checks.AllowedFields("fields", "when")))
}

func newDropFields(c *common.Config) (processors.Processor, error) {
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/include_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type includeFields struct {
Expand All @@ -34,9 +35,9 @@ type includeFields struct {

func init() {
processors.RegisterPlugin("include_fields",
configChecked(newIncludeFields,
requireFields("fields"),
allowedFields("fields", "when")))
checks.ConfigChecked(newIncludeFields,
checks.RequireFields("fields"),
checks.AllowedFields("fields", "when")))
}

func newIncludeFields(c *common.Config) (processors.Processor, error) {
Expand Down
5 changes: 3 additions & 2 deletions libbeat/processors/actions/rename.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type renameFields struct {
Expand All @@ -45,8 +46,8 @@ type fromTo struct {

func init() {
processors.RegisterPlugin("rename",
configChecked(NewRenameFields,
requireFields("fields")))
checks.ConfigChecked(NewRenameFields,
checks.RequireFields("fields")))
}

// NewRenameFields returns a new rename processor.
Expand Down
7 changes: 4 additions & 3 deletions libbeat/processors/actions/truncate_fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/processors/checks"
)

type truncateFieldsConfig struct {
Expand All @@ -48,9 +49,9 @@ type truncater func(*truncateFields, []byte) ([]byte, bool, error)

func init() {
processors.RegisterPlugin("truncate_fields",
configChecked(NewTruncateFields,
requireFields("fields"),
mutuallyExclusiveRequiredFields("max_bytes", "max_characters"),
checks.ConfigChecked(NewTruncateFields,
checks.RequireFields("fields"),
checks.MutuallyExclusiveRequiredFields("max_bytes", "max_characters"),
),
)
}
Expand Down
Loading

0 comments on commit e03993f

Please sign in to comment.