From e03993f9459ddd045f21226f498cdfc86bcfbfae Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 26 Apr 2019 10:13:20 +0200 Subject: [PATCH] New decode_csv_fields processor (#11753) This patch introduces a new processor, `decode_csv_fields` that decodes rows of CSV-formatted data into a string array, one element per column. processors: - decode_csv_fields: fields: message: csv separator: , overwrite_keys: false ignore_missing: false trim_leading_space: false --- filebeat/beater/filebeat.go | 1 + filebeat/tests/system/test_processors.py | 55 ++- journalbeat/beater/journalbeat.go | 3 + libbeat/docs/processors-using.asciidoc | 48 +++ libbeat/processors/actions/add_fields.go | 7 +- libbeat/processors/actions/add_labels.go | 7 +- libbeat/processors/actions/add_tags.go | 7 +- libbeat/processors/actions/copy_fields.go | 5 +- .../processors/actions/decode_json_fields.go | 7 +- libbeat/processors/actions/drop_event.go | 3 +- libbeat/processors/actions/drop_fields.go | 7 +- libbeat/processors/actions/include_fields.go | 7 +- libbeat/processors/actions/rename.go | 5 +- libbeat/processors/actions/truncate_fields.go | 7 +- .../processors/{actions => checks}/checks.go | 17 +- .../{actions => checks}/checks_test.go | 10 +- .../decode_csv_fields/decode_csv_fields.go | 153 ++++++++ .../decode_csv_fields_test.go | 356 ++++++++++++++++++ .../javascript/module/processor/processor.go | 2 + 19 files changed, 669 insertions(+), 38 deletions(-) rename libbeat/processors/{actions => checks}/checks.go (77%) rename libbeat/processors/{actions => checks}/checks_test.go (94%) create mode 100644 libbeat/processors/decode_csv_fields/decode_csv_fields.go create mode 100644 libbeat/processors/decode_csv_fields/decode_csv_fields_test.go diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 6c36061ad52..13f1fda9650 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -47,6 +47,7 @@ import ( // Add filebeat level processors _ "github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata" + _ "github.com/elastic/beats/libbeat/processors/decode_csv_fields" ) const pipelinesWarning = "Filebeat is unable to load the Ingest Node pipelines for the configured" + diff --git a/filebeat/tests/system/test_processors.py b/filebeat/tests/system/test_processors.py index 5450d80a355..c6514986120 100644 --- a/filebeat/tests/system/test_processors.py +++ b/filebeat/tests/system/test_processors.py @@ -249,6 +249,57 @@ def test_truncate_characters(self): u"This is OK", ]) + def test_decode_csv_fields_defaults(self): + """ + Check CSV decoding using defaults + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + processors=[{ + "decode_csv_fields": { + "fields": { + "message": "csv" + } + }, + }] + ) + + self._init_and_read_test_input([ + u"42,\"string with \"\"quotes\"\"\"\n", + u",\n" + ]) + + self._assert_expected_lines([ + ["42", "string with \"quotes\""], + ["", ""] + ], field="csv") + + def test_decode_csv_fields_all_options(self): + """ + Check CSV decoding with options + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/test.log", + processors=[{ + "decode_csv_fields": { + "fields": { + "message": "message" + }, + "overwrite_keys": True, + "separator": "\"\t\"", + "trim_leading_space": True, + }, + }] + ) + + self._init_and_read_test_input([ + u" 42\t hello world\t \"string\twith tabs and \"broken\" quotes\"\n", + ]) + + self._assert_expected_lines([ + ["42", "hello world", "string\twith tabs and \"broken\" quotes"], + ]) + def _init_and_read_test_input(self, input_lines): with io.open(self.working_dir + "/test.log", "w", encoding="utf-8") as f: for line in input_lines: @@ -258,10 +309,10 @@ def _init_and_read_test_input(self, input_lines): self.wait_until(lambda: self.output_has(lines=len(input_lines))) filebeat.check_kill_and_wait() - def _assert_expected_lines(self, expected_lines): + def _assert_expected_lines(self, expected_lines, field="message"): output = self.read_output() assert len(output) == len(expected_lines) for i in range(len(expected_lines)): - assert output[i]["message"] == expected_lines[i] + assert output[i][field] == expected_lines[i] diff --git a/journalbeat/beater/journalbeat.go b/journalbeat/beater/journalbeat.go index 196811fb522..aefe6f8abe1 100644 --- a/journalbeat/beater/journalbeat.go +++ b/journalbeat/beater/journalbeat.go @@ -32,6 +32,9 @@ import ( "github.com/elastic/beats/journalbeat/config" _ "github.com/elastic/beats/journalbeat/include" + + // Add dedicated processors + _ "github.com/elastic/beats/libbeat/processors/decode_csv_fields" ) // Journalbeat instance diff --git a/libbeat/docs/processors-using.asciidoc b/libbeat/docs/processors-using.asciidoc index 9a44d923fc5..12bebe2d91c 100644 --- a/libbeat/docs/processors-using.asciidoc +++ b/libbeat/docs/processors-using.asciidoc @@ -205,6 +205,9 @@ The supported processors are: * <> * <> * <> +ifeval::[("{beatname_lc}"=="filebeat") or ("{beatname_lc}"=="journalbeat")] +* <> +endif::[] * <> * <> * <> @@ -776,6 +779,51 @@ Adds the environment field to every event: } ------------------------------------------------------------------------------- +ifeval::[("{beatname_lc}"=="filebeat") or ("{beatname_lc}"=="journalbeat")] +* <> +[[decode-csv-fields]] +=== Decode CSV fields + +experimental[] + +The `decode_csv_fields` processor decodes fields containing records in +comma-separated format (CSV). It will output the values as an array of strings. +This processor is available for Filebeat and Journalbeat. + +[source,yaml] +----------------------------------------------------- +processors: + - decode_csv_fields: + fields: + message: decoded.csv + separator: , + ignore_missing: false + overwrite_keys: true + trim_leading_whitespace: false + fail_on_error: true +----------------------------------------------------- + +The `decode_csv_fields` has the following settings: + +`fields`:: This is a mapping from the source field containing the CSV data to + the destination field where the decoded array will be written. +`separator`:: (Optional) Character to be used as a column separator. + The default is the comma character. For using a TAB character you + must set it to "\t". +`ignore_missing`:: (Optional) Whether to ignore events which lack the source + field. The default is false, which will fail processing of an + event if `target` field is missing. +`overwrite_keys`:: Whether the target field is overwritten if it + already exists. The default is false, which will fail + processing of an event when `target` already exists. +`trim_leading_space`:: Whether extra space after the separator is trimmed from + values. This works even if the separator is also a space. + The default is false. +`fail_on_error`:: (Optional) If set to true, in case of an error the changes to +the event are reverted and the original event is returned. If set to false, +processing continues also if an error happened during renaming. Default is `true`. + +endif::[] [[decode-json-fields]] === Decode JSON fields diff --git a/libbeat/processors/actions/add_fields.go b/libbeat/processors/actions/add_fields.go index 710810d2a52..22e2fd02fe4 100644 --- a/libbeat/processors/actions/add_fields.go +++ b/libbeat/processors/actions/add_fields.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type addFields struct { @@ -36,9 +37,9 @@ const FieldsKey = "fields" func init() { processors.RegisterPlugin("add_fields", - configChecked(CreateAddFields, - requireFields(FieldsKey), - allowedFields(FieldsKey, "target", "when"))) + checks.ConfigChecked(CreateAddFields, + checks.RequireFields(FieldsKey), + checks.AllowedFields(FieldsKey, "target", "when"))) } // CreateAddFields constructs an add_fields processor from config. diff --git a/libbeat/processors/actions/add_labels.go b/libbeat/processors/actions/add_labels.go index 42befc53771..de6f4cc9011 100644 --- a/libbeat/processors/actions/add_labels.go +++ b/libbeat/processors/actions/add_labels.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) // LabelsKey is the default target key for the add_labels processor. @@ -29,9 +30,9 @@ const LabelsKey = "labels" func init() { processors.RegisterPlugin("add_labels", - configChecked(createAddLabels, - requireFields(LabelsKey), - allowedFields(LabelsKey, "when"))) + checks.ConfigChecked(createAddLabels, + checks.RequireFields(LabelsKey), + checks.AllowedFields(LabelsKey, "when"))) } func createAddLabels(c *common.Config) (processors.Processor, error) { diff --git a/libbeat/processors/actions/add_tags.go b/libbeat/processors/actions/add_tags.go index 95db6dea973..d8e6cd9811c 100644 --- a/libbeat/processors/actions/add_tags.go +++ b/libbeat/processors/actions/add_tags.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type addTags struct { @@ -33,9 +34,9 @@ type addTags struct { func init() { processors.RegisterPlugin("add_tags", - configChecked(createAddTags, - requireFields("tags"), - allowedFields("tags", "target", "when"))) + checks.ConfigChecked(createAddTags, + checks.RequireFields("tags"), + checks.AllowedFields("tags", "target", "when"))) } func createAddTags(c *common.Config) (processors.Processor, error) { diff --git a/libbeat/processors/actions/copy_fields.go b/libbeat/processors/actions/copy_fields.go index c57447ac6ee..ec82cfa3e02 100644 --- a/libbeat/processors/actions/copy_fields.go +++ b/libbeat/processors/actions/copy_fields.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type copyFields struct { @@ -40,8 +41,8 @@ type copyFieldsConfig struct { func init() { processors.RegisterPlugin("copy_fields", - configChecked(NewCopyFields, - requireFields("fields"), + checks.ConfigChecked(NewCopyFields, + checks.RequireFields("fields"), ), ) } diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index 74d7311f1aa..e4aba502dbf 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/beats/libbeat/common/jsontransform" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type decodeJSONFields struct { @@ -60,9 +61,9 @@ var debug = logp.MakeDebug("filters") func init() { processors.RegisterPlugin("decode_json_fields", - configChecked(NewDecodeJSONFields, - requireFields("fields"), - allowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when"))) + checks.ConfigChecked(NewDecodeJSONFields, + checks.RequireFields("fields"), + checks.AllowedFields("fields", "max_depth", "overwrite_keys", "process_array", "target", "when"))) } // NewDecodeJSONFields construct a new decode_json_fields processor. diff --git a/libbeat/processors/actions/drop_event.go b/libbeat/processors/actions/drop_event.go index b6b735112e6..a24db8fc3f1 100644 --- a/libbeat/processors/actions/drop_event.go +++ b/libbeat/processors/actions/drop_event.go @@ -21,13 +21,14 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type dropEvent struct{} func init() { processors.RegisterPlugin("drop_event", - configChecked(newDropEvent, allowedFields("when"))) + checks.ConfigChecked(newDropEvent, checks.AllowedFields("when"))) } var dropEventsSingleton = (*dropEvent)(nil) diff --git a/libbeat/processors/actions/drop_fields.go b/libbeat/processors/actions/drop_fields.go index f3c528728fc..051170a31f0 100644 --- a/libbeat/processors/actions/drop_fields.go +++ b/libbeat/processors/actions/drop_fields.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type dropFields struct { @@ -32,9 +33,9 @@ type dropFields struct { func init() { processors.RegisterPlugin("drop_fields", - configChecked(newDropFields, - requireFields("fields"), - allowedFields("fields", "when"))) + checks.ConfigChecked(newDropFields, + checks.RequireFields("fields"), + checks.AllowedFields("fields", "when"))) } func newDropFields(c *common.Config) (processors.Processor, error) { diff --git a/libbeat/processors/actions/include_fields.go b/libbeat/processors/actions/include_fields.go index 1ce3f234058..ada8d300bef 100644 --- a/libbeat/processors/actions/include_fields.go +++ b/libbeat/processors/actions/include_fields.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type includeFields struct { @@ -34,9 +35,9 @@ type includeFields struct { func init() { processors.RegisterPlugin("include_fields", - configChecked(newIncludeFields, - requireFields("fields"), - allowedFields("fields", "when"))) + checks.ConfigChecked(newIncludeFields, + checks.RequireFields("fields"), + checks.AllowedFields("fields", "when"))) } func newIncludeFields(c *common.Config) (processors.Processor, error) { diff --git a/libbeat/processors/actions/rename.go b/libbeat/processors/actions/rename.go index 2fc578e3cb8..2a94b9f834e 100644 --- a/libbeat/processors/actions/rename.go +++ b/libbeat/processors/actions/rename.go @@ -26,6 +26,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type renameFields struct { @@ -45,8 +46,8 @@ type fromTo struct { func init() { processors.RegisterPlugin("rename", - configChecked(NewRenameFields, - requireFields("fields"))) + checks.ConfigChecked(NewRenameFields, + checks.RequireFields("fields"))) } // NewRenameFields returns a new rename processor. diff --git a/libbeat/processors/actions/truncate_fields.go b/libbeat/processors/actions/truncate_fields.go index 70244b2cfa1..d924da381a5 100644 --- a/libbeat/processors/actions/truncate_fields.go +++ b/libbeat/processors/actions/truncate_fields.go @@ -29,6 +29,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" ) type truncateFieldsConfig struct { @@ -48,9 +49,9 @@ type truncater func(*truncateFields, []byte) ([]byte, bool, error) func init() { processors.RegisterPlugin("truncate_fields", - configChecked(NewTruncateFields, - requireFields("fields"), - mutuallyExclusiveRequiredFields("max_bytes", "max_characters"), + checks.ConfigChecked(NewTruncateFields, + checks.RequireFields("fields"), + checks.MutuallyExclusiveRequiredFields("max_bytes", "max_characters"), ), ) } diff --git a/libbeat/processors/actions/checks.go b/libbeat/processors/checks/checks.go similarity index 77% rename from libbeat/processors/actions/checks.go rename to libbeat/processors/checks/checks.go index d554409cd9d..d275278544b 100644 --- a/libbeat/processors/actions/checks.go +++ b/libbeat/processors/checks/checks.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package actions +package checks import ( "fmt" @@ -24,7 +24,9 @@ import ( "github.com/elastic/beats/libbeat/processors" ) -func configChecked( +// ConfigChecked returns a wrapper that will validate the configuration using +// the passed checks before invoking the original constructor. +func ConfigChecked( constr processors.Constructor, checks ...func(*common.Config) error, ) processors.Constructor { @@ -50,7 +52,8 @@ func checkAll(checks ...func(*common.Config) error) func(*common.Config) error { } } -func requireFields(fields ...string) func(*common.Config) error { +// RequireFields checks that the required fields are present in the configuration. +func RequireFields(fields ...string) func(*common.Config) error { return func(cfg *common.Config) error { for _, field := range fields { if !cfg.HasField(field) { @@ -61,7 +64,8 @@ func requireFields(fields ...string) func(*common.Config) error { } } -func allowedFields(fields ...string) func(*common.Config) error { +// AllowedFields checks that only allowed fields are used in the configuration. +func AllowedFields(fields ...string) func(*common.Config) error { return func(cfg *common.Config) error { for _, field := range cfg.GetFields() { found := false @@ -80,7 +84,10 @@ func allowedFields(fields ...string) func(*common.Config) error { } } -func mutuallyExclusiveRequiredFields(fields ...string) func(*common.Config) error { +// MutuallyExclusiveRequiredFields checks that only one of the given +// fields is used at the same time. It is an error for none of the fields to be +// present. +func MutuallyExclusiveRequiredFields(fields ...string) func(*common.Config) error { return func(cfg *common.Config) error { var foundField string for _, field := range cfg.GetFields() { diff --git a/libbeat/processors/actions/checks_test.go b/libbeat/processors/checks/checks_test.go similarity index 94% rename from libbeat/processors/actions/checks_test.go rename to libbeat/processors/checks/checks_test.go index ef8a2368a28..656f922fccc 100644 --- a/libbeat/processors/actions/checks_test.go +++ b/libbeat/processors/checks/checks_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package actions +package checks import ( "testing" @@ -82,7 +82,7 @@ func TestRequiredFields(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - runTest(t, requireFields, test.Config, test.Required, test.Valid) + runTest(t, RequireFields, test.Config, test.Required, test.Valid) }) } } @@ -127,7 +127,7 @@ func TestAllowedFields(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - runTest(t, allowedFields, test.Config, test.Allowed, test.Valid) + runTest(t, AllowedFields, test.Config, test.Allowed, test.Valid) }) } } @@ -173,7 +173,7 @@ func TestMutuallyExclusiveRequiredFields(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - runTest(t, mutuallyExclusiveRequiredFields, test.Config, test.MutuallyExclusive, test.Valid) + runTest(t, MutuallyExclusiveRequiredFields, test.Config, test.MutuallyExclusive, test.Valid) }) } } @@ -189,7 +189,7 @@ func runTest( if err != nil { t.Fatalf("Unexpected error while creating configuration: %+v\n", err) } - factory := configChecked(newMock, check(fields...)) + factory := ConfigChecked(newMock, check(fields...)) _, err = factory(cfg) if err != nil && valid { diff --git a/libbeat/processors/decode_csv_fields/decode_csv_fields.go b/libbeat/processors/decode_csv_fields/decode_csv_fields.go new file mode 100644 index 00000000000..a23952098b0 --- /dev/null +++ b/libbeat/processors/decode_csv_fields/decode_csv_fields.go @@ -0,0 +1,153 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package decode_csv_fields + +import ( + "encoding/csv" + "encoding/json" + "fmt" + "strings" + + "github.com/pkg/errors" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/processors/checks" +) + +type decodeCSVFields struct { + csvConfig + fields map[string]string + separator rune +} + +type csvConfig struct { + Fields common.MapStr `config:"fields"` + IgnoreMissing bool `config:"ignore_missing"` + TrimLeadingSpace bool `config:"trim_leading_space"` + OverwriteKeys bool `config:"overwrite_keys"` + FailOnError bool `config:"fail_on_error"` + Separator string `config:"separator"` +} + +var ( + defaultCSVConfig = csvConfig{ + Separator: ",", + FailOnError: true, + } + + errFieldAlreadySet = errors.New("field already has a value") +) + +func init() { + processors.RegisterPlugin("decode_csv_fields", + checks.ConfigChecked(NewDecodeCSVField, + checks.RequireFields("fields"), + checks.AllowedFields("fields", "ignore_missing", "overwrite_keys", "separator", "trim_leading_space", "overwrite_keys", "fail_on_error"))) +} + +// NewDecodeCSVField construct a new decode_csv_field processor. +func NewDecodeCSVField(c *common.Config) (processors.Processor, error) { + config := defaultCSVConfig + + err := c.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the decode_csv_field configuration: %s", err) + } + if len(config.Fields) == 0 { + return nil, errors.New("no fields to decode configured") + } + f := &decodeCSVFields{csvConfig: config} + // Set separator as rune + switch runes := []rune(config.Separator); len(runes) { + case 0: + break + case 1: + f.separator = runes[0] + default: + return nil, errors.Errorf("separator must be a single character, got %d in string '%s'", len(runes), config.Separator) + } + // Set fields as string -> string + f.fields = make(map[string]string, len(config.Fields)) + for src, dstIf := range config.Fields.Flatten() { + dst, ok := dstIf.(string) + if !ok { + return nil, errors.Errorf("bad destination mapping for %s: destination field must be string, not %T (got %v)", src, dstIf, dstIf) + } + f.fields[src] = dst + } + return f, nil +} + +// Run applies the decode_csv_field processor to an event. +func (f *decodeCSVFields) Run(event *beat.Event) (*beat.Event, error) { + saved := *event + if f.FailOnError { + saved.Fields = event.Fields.Clone() + saved.Meta = event.Meta.Clone() + } + for src, dest := range f.fields { + if err := f.decodeCSVField(src, dest, event); err != nil && f.FailOnError { + return &saved, err + } + } + return event, nil +} + +func (f *decodeCSVFields) decodeCSVField(src, dest string, event *beat.Event) error { + data, err := event.GetValue(src) + if err != nil { + if f.IgnoreMissing && errors.Cause(err) == common.ErrKeyNotFound { + return nil + } + return errors.Wrapf(err, "could not fetch value for field %s", src) + } + + text, ok := data.(string) + if !ok { + return errors.Errorf("field %s is not of string type", src) + } + + reader := csv.NewReader(strings.NewReader(text)) + reader.Comma = f.separator + reader.TrimLeadingSpace = f.TrimLeadingSpace + // LazyQuotes makes the parser more tolerant to bad string formatting. + reader.LazyQuotes = true + + record, err := reader.Read() + if err != nil { + return errors.Wrapf(err, "error decoding CSV from field %s", src) + } + + if src != dest && !f.OverwriteKeys { + if _, err = event.GetValue(dest); err == nil { + return errors.Errorf("target field %s already has a value. Set the overwrite_keys flag or drop/rename the field first", dest) + } + } + if _, err = event.PutValue(dest, record); err != nil { + return errors.Wrapf(err, "failed setting field %s", dest) + } + return nil +} + +// String returns a string representation of this processor. +func (f decodeCSVFields) String() string { + json, _ := json.Marshal(f.csvConfig) + return "decode_csv_field=" + string(json) +} diff --git a/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go b/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go new file mode 100644 index 00000000000..68a35b6300a --- /dev/null +++ b/libbeat/processors/decode_csv_fields/decode_csv_fields_test.go @@ -0,0 +1,356 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package decode_csv_fields + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/beat" + "github.com/elastic/beats/libbeat/common" +) + +func TestDecodeCSVField(t *testing.T) { + tests := map[string]struct { + config common.MapStr + input beat.Event + expected beat.Event + fail bool + }{ + "self target": { + config: common.MapStr{ + "fields": common.MapStr{ + "message": "message", + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "message": "17,192.168.33.1,8.8.8.8", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "message": []string{"17", "192.168.33.1", "8.8.8.8"}, + }, + }, + }, + + "alternative target": { + config: common.MapStr{ + "fields": common.MapStr{ + "my": common.MapStr{ + "field": "message", + }, + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "my": common.MapStr{ + "field": "17,192.168.33.1,8.8.8.8", + }, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "my.field": "17,192.168.33.1,8.8.8.8", + "message": []string{"17", "192.168.33.1", "8.8.8.8"}, + }, + }, + }, + + "non existing field": { + config: common.MapStr{ + "fields": common.MapStr{ + "field": "my.field", + }, + }, + fail: true, + }, + + "ignore missing": { + config: common.MapStr{ + "fields": common.MapStr{ + "my_field": "my_field", + }, + + "ignore_missing": true, + }, + }, + + "overwrite keys failure": { + config: common.MapStr{ + "fields": common.MapStr{ + "message": "existing_field", + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "message": `"hello ""world"""`, + "existing_field": 42, + }, + }, + fail: true, + }, + + "overwrite keys": { + config: common.MapStr{ + "fields": common.MapStr{ + "message": "existing_field", + }, + "overwrite_keys": true, + }, + input: beat.Event{ + Fields: common.MapStr{ + "message": `"hello ""world"""`, + "existing_field": 42, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "message": `"hello ""world"""`, + "existing_field": []string{`hello "world"`}, + }, + }, + }, + + "custom separator": { + config: common.MapStr{ + "fields": common.MapStr{ + "message": "message", + }, + "separator": ";", + }, + input: beat.Event{ + Fields: common.MapStr{ + "message": "1.5;false;hello world;3", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "message": []string{"1.5", "false", "hello world", "3"}, + }, + }, + }, + + "trim leading space": { + config: common.MapStr{ + "fields": common.MapStr{ + "message": "message", + }, + "trim_leading_space": true, + }, + input: beat.Event{ + Fields: common.MapStr{ + "message": " Here's, some, extra ,whitespace", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "message": []string{"Here's", "some", "extra ", "whitespace"}, + }, + }, + }, + + "tab separator": { + config: common.MapStr{ + "fields": common.MapStr{ + "message": "message", + }, + "separator": "\t", + "overwrite_keys": true, + }, + input: beat.Event{ + Fields: common.MapStr{ + "message": "Tab\tin\tASCII\thas\tthe\t\"decimal\tcharacter\tcode\"\t9", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "message": []string{"Tab", "in", "ASCII", "has", "the", "decimal\tcharacter\tcode", "9"}, + }, + }, + }, + + "unicode separator": { + config: common.MapStr{ + "fields": common.MapStr{ + "message": "message", + }, + "separator": "🍺", + "overwrite_keys": true, + }, + input: beat.Event{ + Fields: common.MapStr{ + "message": `πŸ’πŸΊπŸŒ”πŸˆπŸΊπŸΊπŸ₯🐲`, + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "message": []string{"🐒", "πŸŒ”πŸˆ", "", "πŸ₯🐲"}, + }, + }, + }, + + "bad type": { + config: common.MapStr{ + "fields": common.MapStr{ + "message": "message", + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "message": 42, + }, + }, + fail: true, + }, + + "multiple fields": { + config: common.MapStr{ + "fields": common.MapStr{ + "a": "a_csv", + "b": "b_csv", + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "a": "1,2", + "b": "hello,world", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "a": "1,2", + "b": "hello,world", + "a_csv": []string{"1", "2"}, + "b_csv": []string{"hello", "world"}, + }, + }, + }, + + "multiple fields failure": { + config: common.MapStr{ + "fields": common.MapStr{ + "a": "a.csv", + "b": "b.csv", + }, + }, + input: beat.Event{ + Fields: common.MapStr{ + "a": "1,2", + "b": "hello,world", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "a": "1,2", + "b": "hello,world", + }, + }, + fail: true, + }, + + "ignore errors": { + config: common.MapStr{ + "fields": common.MapStr{ + "a": "a", + "b": "b", + "c": "a.b", + }, + "fail_on_error": false, + }, + input: beat.Event{ + Fields: common.MapStr{ + "a": "1,2", + "b": "hello,world", + "c": ":)", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "a": []string{"1", "2"}, + "b": []string{"hello", "world"}, + "c": ":)", + }, + }, + }, + + "restore on errors": { + config: common.MapStr{ + "fields": common.MapStr{ + "a": "a", + "b": "b", + "c": "a.b", + }, + "fail_on_error": true, + }, + input: beat.Event{ + Fields: common.MapStr{ + "a": "1,2", + "b": "hello,world", + "c": ":)", + }, + }, + expected: beat.Event{ + Fields: common.MapStr{ + "a": "1,2", + "b": "hello,world", + "c": ":)", + }, + }, + fail: true, + }, + } + + for title, tt := range tests { + t.Run(title, func(t *testing.T) { + processor, err := NewDecodeCSVField(common.MustNewConfigFrom(tt.config)) + if err != nil { + t.Fatal(err) + } + result, err := processor.Run(&tt.input) + if tt.expected.Fields != nil { + assert.Equal(t, tt.expected.Fields.Flatten(), result.Fields.Flatten()) + assert.Equal(t, tt.expected.Meta.Flatten(), result.Meta.Flatten()) + assert.Equal(t, tt.expected.Timestamp, result.Timestamp) + } + if tt.fail { + assert.Error(t, err) + t.Log("got expected error", err) + return + } + assert.NoError(t, err) + }) + } +} + +func TestDecodeCSVField_String(t *testing.T) { + p, err := NewDecodeCSVField(common.MustNewConfigFrom(common.MapStr{ + "fields": common.MapStr{ + "a": "csv.a", + "b": "csv.b", + }, + "separator": "#", + "ignore_missing": true, + })) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, "decode_csv_field={\"Fields\":{\"a\":\"csv.a\",\"b\":\"csv.b\"},\"IgnoreMissing\":true,\"TrimLeadingSpace\":false,\"OverwriteKeys\":false,\"FailOnError\":true,\"Separator\":\"#\"}", p.String()) +} diff --git a/libbeat/processors/script/javascript/module/processor/processor.go b/libbeat/processors/script/javascript/module/processor/processor.go index 6cc9a4fdd12..25c9cae6209 100644 --- a/libbeat/processors/script/javascript/module/processor/processor.go +++ b/libbeat/processors/script/javascript/module/processor/processor.go @@ -32,6 +32,7 @@ import ( "github.com/elastic/beats/libbeat/processors/add_observer_metadata" "github.com/elastic/beats/libbeat/processors/add_process_metadata" "github.com/elastic/beats/libbeat/processors/communityid" + "github.com/elastic/beats/libbeat/processors/decode_csv_fields" "github.com/elastic/beats/libbeat/processors/dissect" "github.com/elastic/beats/libbeat/processors/dns" "github.com/elastic/beats/libbeat/processors/script/javascript" @@ -50,6 +51,7 @@ var constructors = map[string]processors.Constructor{ "AddProcessMetadata": add_process_metadata.New, "CommunityID": communityid.New, "CopyFields": actions.NewCopyFields, + "DecodeCSVField": decode_csv_fields.NewDecodeCSVField, "DecodeJSONFields": actions.NewDecodeJSONFields, "Dissect": dissect.NewProcessor, "DNS": dns.New,