diff --git a/cmd/stanza/init_common.go b/cmd/stanza/init_common.go index 467395d37..54de30d8a 100644 --- a/cmd/stanza/init_common.go +++ b/cmd/stanza/init_common.go @@ -17,6 +17,7 @@ import ( _ "github.com/observiq/stanza/operator/builtin/parser/csv" _ "github.com/observiq/stanza/operator/builtin/parser/json" + _ "github.com/observiq/stanza/operator/builtin/parser/keyvalue" _ "github.com/observiq/stanza/operator/builtin/parser/regex" _ "github.com/observiq/stanza/operator/builtin/parser/severity" _ "github.com/observiq/stanza/operator/builtin/parser/syslog" diff --git a/docs/operators/key_value_parser.md b/docs/operators/key_value_parser.md new file mode 100644 index 000000000..377afee2e --- /dev/null +++ b/docs/operators/key_value_parser.md @@ -0,0 +1,101 @@ +## `key_value_parser` operator + +The `key_value_parser` operator parses the string-type field selected by `parse_from` into key value pairs. All values are of type string. + +### Configuration Fields + +| Field | Default | Description | +| --- | --- | --- | +| `id` | `key_value_parser` | A unique identifier for the operator | +| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries | +| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed into key value pairs | +| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed as into key value pairs | +| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md) | +| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) | +| `if` | | An [expression](/docs/types/expression.md) that, when set, will be evaluated to determine whether this operator should be used for the given entry. This allows you to do easy conditional parsing without branching logic with routers. | +| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator | +| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator | + + +### Example Configurations + + +#### Parse the field `message` into key value pairs + +Configuration: +```yaml +- type: key_value_parser + parse_from: message +``` + + + + + + + +
Input record Output record
+ +```json +{ + "timestamp": "", + "record": { + "message": "name=stanza" + } +} +``` + + + +```json +{ + "timestamp": "", + "record": { + "name": "stanza" + } +} +``` + +
+ +#### Parse the field `message` as key value pairs, and parse the timestamp + +Configuration: +```yaml +- type: key_value_parser + parse_from: message + timestamp: + parse_from: seconds_since_epoch + layout_type: epoch + layout: s +``` + + + + + + + +
Input record Output record
+ +```json +{ + "timestamp": "", + "record": { + "message": "name=stanza seconds_since_epoch=1136214245" + } +} +``` + + + +```json +{ + "timestamp": "2006-01-02T15:04:05-07:00", + "record": { + "name": "stanza" + } +} +``` + +
\ No newline at end of file diff --git a/go.mod b/go.mod index 90e2c8e5b..c281f4df0 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/cenkalti/backoff/v4 v4.1.1 github.com/elastic/go-elasticsearch/v7 v7.13.0 github.com/golang/protobuf v1.5.2 + github.com/hashicorp/go-multierror v1.1.0 github.com/hashicorp/go-uuid v1.0.2 github.com/jpillora/backoff v1.0.0 github.com/json-iterator/go v1.1.11 @@ -124,6 +125,7 @@ require ( github.com/gookit/color v1.2.5 // indirect github.com/gostaticanalysis/analysisutil v0.0.3 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-version v1.2.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect diff --git a/go.sum b/go.sum index 75a45d8af..c37ec11d4 100644 --- a/go.sum +++ b/go.sum @@ -785,6 +785,7 @@ github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyN github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/consul/sdk v0.6.0/go.mod h1:fY08Y9z5SvJqevyZNy6WWPXiG3KwBPAvlcdx16zZ0fM= github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= @@ -798,6 +799,7 @@ github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iP github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g4TbElacI= github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/hashicorp/go-plugin v1.0.1/go.mod h1:++UyYGoz3o5w9ZzAdZxtQKrWWP+iqPBn3cQptSMzBuY= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= diff --git a/license.yaml b/license.yaml index 452e83e48..0978d1825 100644 --- a/license.yaml +++ b/license.yaml @@ -14,6 +14,10 @@ exceptions: # MPL is approved as long as the source is not modified - path: "github.com/hashicorp/go-uuid" licenses: ["MPL-2.0"] + - path: "github.com/hashicorp/errwrap" + licenses: ["MPL-2.0"] + - path: "github.com/hashicorp/go-multierror" + licenses: ["MPL-2.0"] # ISC - path: "github.com/davecgh/go-spew" diff --git a/operator/builtin/parser/keyvalue/keyvalue.go b/operator/builtin/parser/keyvalue/keyvalue.go new file mode 100644 index 000000000..05df548b9 --- /dev/null +++ b/operator/builtin/parser/keyvalue/keyvalue.go @@ -0,0 +1,123 @@ +package keyvalue + +import ( + "context" + "fmt" + "strings" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" + + "github.com/hashicorp/go-multierror" +) + +func init() { + operator.Register("key_value_parser", func() operator.Builder { return NewKVParserConfig("") }) +} + +// NewKVParserConfig creates a new key value parser config with default values +func NewKVParserConfig(operatorID string) *KVParserConfig { + return &KVParserConfig{ + ParserConfig: helper.NewParserConfig(operatorID, "key_value_parser"), + Delimiter: "=", + } +} + +// KVParserConfig is the configuration of a key value parser operator. +type KVParserConfig struct { + helper.ParserConfig `yaml:",inline"` + + Delimiter string `json:"delimiter" yaml:"delimiter"` +} + +// Build will build a key value parser operator. +func (c KVParserConfig) Build(context operator.BuildContext) ([]operator.Operator, error) { + parserOperator, err := c.ParserConfig.Build(context) + if err != nil { + return nil, err + } + + if len(c.Delimiter) == 0 { + return nil, fmt.Errorf("delimiter is a required parameter") + } + + kvParser := &KVParser{ + ParserOperator: parserOperator, + delimiter: c.Delimiter, + } + + return []operator.Operator{kvParser}, nil +} + +// KVParser is an operator that parses key value pairs. +type KVParser struct { + helper.ParserOperator + delimiter string +} + +// Process will parse an entry for key value pairs. +func (kv *KVParser) Process(ctx context.Context, entry *entry.Entry) error { + return kv.ParserOperator.ProcessWith(ctx, entry, kv.parse) +} + +// parse will parse a value as key values. +func (kv *KVParser) parse(value interface{}) (interface{}, error) { + switch m := value.(type) { + case string: + return kv.parser(m, kv.delimiter) + case []byte: + return kv.parser(string(m), kv.delimiter) + default: + return nil, fmt.Errorf("type %T cannot be parsed as key value pairs", value) + } +} + +func (kv *KVParser) parser(input string, delimiter string) (map[string]interface{}, error) { + if len(input) == 0 { + return nil, fmt.Errorf("parse from field %s is empty", kv.ParseFrom.String()) + } + + parsed := make(map[string]interface{}) + + var err error + for _, raw := range splitStringByWhitespace(input) { + m := strings.Split(raw, delimiter) + if len(m) != 2 { + e := fmt.Errorf("expected '%s' to split by '%s' into two items, got %d", raw, delimiter, len(m)) + err = multierror.Append(err, e) + continue + } + + key := cleanString(m[0]) + value := cleanString(m[1]) + + // TODO: Check if key already exists and fail if so? + parsed[key] = value + } + + return parsed, err +} + +// split on whitespace and preserve quoted text +func splitStringByWhitespace(input string) []string { + quoted := false + raw := strings.FieldsFunc(input, func(r rune) bool { + if r == '"' { + quoted = !quoted + } + return !quoted && r == ' ' + }) + return raw +} + +// trim leading and trailing space +func cleanString(input string) string { + if len(input) > 0 && input[0] == '"' { + input = input[1:] + } + if len(input) > 0 && input[len(input)-1] == '"' { + input = input[:len(input)-1] + } + return strings.TrimSpace(input) +} diff --git a/operator/builtin/parser/keyvalue/keyvalue_test.go b/operator/builtin/parser/keyvalue/keyvalue_test.go new file mode 100644 index 000000000..857a93126 --- /dev/null +++ b/operator/builtin/parser/keyvalue/keyvalue_test.go @@ -0,0 +1,452 @@ +package keyvalue + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/observiq/stanza/entry" + "github.com/observiq/stanza/operator" + "github.com/observiq/stanza/operator/helper" + "github.com/observiq/stanza/testutil" + "go.uber.org/zap" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func newTestParser(t *testing.T) *KVParser { + config := NewKVParserConfig("test") + ops, err := config.Build(testutil.NewBuildContext(t)) + op := ops[0] + require.NoError(t, err) + return op.(*KVParser) +} + +func TestKVParserConfigBuild(t *testing.T) { + config := NewKVParserConfig("test") + ops, err := config.Build(testutil.NewBuildContext(t)) + op := ops[0] + require.NoError(t, err) + require.IsType(t, &KVParser{}, op) +} + +func TestKVParserConfigBuildFailure(t *testing.T) { + config := NewKVParserConfig("test") + config.OnError = "invalid_on_error" + _, err := config.Build(testutil.NewBuildContext(t)) + require.Error(t, err) + require.Contains(t, err.Error(), "invalid `on_error` field") +} + +func TestBuild(t *testing.T) { + basicConfig := func() *KVParserConfig { + cfg := NewKVParserConfig("test_operator_id") + return cfg + } + + cases := []struct { + name string + input *KVParserConfig + expectErr bool + }{ + { + "default", + func() *KVParserConfig { + cfg := basicConfig() + return cfg + }(), + false, + }, + { + "delimiter", + func() *KVParserConfig { + cfg := basicConfig() + cfg.Delimiter = "/" + return cfg + }(), + false, + }, + { + "missing-delimiter", + func() *KVParserConfig { + cfg := basicConfig() + cfg.Delimiter = "" + return cfg + }(), + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + cfg := tc.input + _, err := cfg.Build(testutil.NewBuildContext(t)) + if tc.expectErr { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + +func TestKVParserStringFailure(t *testing.T) { + parser := newTestParser(t) + _, err := parser.parse("invalid") + require.Error(t, err) + require.Contains(t, err.Error(), fmt.Sprintf("expected '%s' to split by '%s' into two items, got", "invalid", parser.delimiter)) +} + +func TestKVParserByteFailure(t *testing.T) { + parser := newTestParser(t) + _, err := parser.parse([]byte("invalid")) + require.Error(t, err) + require.Contains(t, err.Error(), fmt.Sprintf("expected '%s' to split by '%s' into two items, got", "invalid", parser.delimiter)) +} + +func TestKVParserInvalidType(t *testing.T) { + parser := newTestParser(t) + _, err := parser.parse([]int{}) + require.Error(t, err) + require.Contains(t, err.Error(), "type []int cannot be parsed as key value pairs") +} + +func NewFakeKVOperator() (*KVParser, *testutil.Operator) { + mock := testutil.Operator{} + logger, _ := zap.NewProduction() + return &KVParser{ + ParserOperator: helper.ParserOperator{ + TransformerOperator: helper.TransformerOperator{ + WriterOperator: helper.WriterOperator{ + BasicOperator: helper.BasicOperator{ + OperatorID: "test", + OperatorType: "key_value_parser", + SugaredLogger: logger.Sugar(), + }, + OutputOperators: []operator.Operator{&mock}, + }, + }, + ParseFrom: entry.NewRecordField("testfield"), + ParseTo: entry.NewRecordField("testparsed"), + }, + }, &mock +} + +func TestKVImplementations(t *testing.T) { + require.Implements(t, (*operator.Operator)(nil), new(KVParser)) +} + +func TestKVParser(t *testing.T) { + cases := []struct { + name string + inputRecord map[string]interface{} + expectedRecord map[string]interface{} + delimiter string + errorExpected bool + }{ + { + "simple", + map[string]interface{}{ + "testfield": "name=stanza age=2", + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "name": "stanza", + "age": "2", + }, + }, + "=", + false, + }, + { + "double-quotes-removed", + map[string]interface{}{ + "testfield": "name=\"stanza\" age=2", + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "name": "stanza", + "age": "2", + }, + }, + "=", + false, + }, + { + "double-quotes-spaces-removed", + map[string]interface{}{ + "testfield": `name=" stanza " age=2`, + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "name": "stanza", + "age": "2", + }, + }, + "=", + false, + }, + { + "leading-and-trailing-space", + map[string]interface{}{ + "testfield": `" name "=" stanza " age=2`, + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "name": "stanza", + "age": "2", + }, + }, + "=", + false, + }, + { + "bar-delimiter", + map[string]interface{}{ + "testfield": `name|" stanza " age|2 key|value`, + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "name": "stanza", + "age": "2", + "key": "value", + }, + }, + "|", + false, + }, + { + "double-delimiter", + map[string]interface{}{ + "testfield": `name==" stanza " age==2 key==value`, + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "name": "stanza", + "age": "2", + "key": "value", + }, + }, + "==", + false, + }, + { + "bar-delimiter", + map[string]interface{}{ + "testfield": `test/value a/b 2/text`, + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "test": "value", + "a": "b", + "2": "text", + }, + }, + "/", + false, + }, + { + "large", + map[string]interface{}{ + "testfield": "name=stanza age=1 job=\"software engineering\" location=\"grand rapids michigan\" src=\"10.3.3.76\" dst=172.217.0.10 protocol=udp sport=57112 dport=443 translated_src_ip=96.63.176.3 translated_port=57112", + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "age": "1", + "dport": "443", + "dst": "172.217.0.10", + "job": "software engineering", + "location": "grand rapids michigan", + "name": "stanza", + "protocol": "udp", + "sport": "57112", + "src": "10.3.3.76", + "translated_port": "57112", + "translated_src_ip": "96.63.176.3", + }, + }, + "=", + false, + }, + { + "missing-delimiter", + map[string]interface{}{ + "testfield": `test text`, + }, + map[string]interface{}{}, + "/", + true, + }, + { + "invalid-pair", + map[string]interface{}{ + "testfield": `test=text=abc`, + }, + map[string]interface{}{}, + "=", + true, + }, + { + "empty-input", + map[string]interface{}{ + "testfield": "", + }, + map[string]interface{}{}, + "=", + true, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + input := entry.New() + input.Record = tc.inputRecord + + output := entry.New() + output.Record = tc.expectedRecord + + parser, mockOutput := NewFakeKVOperator() + parser.delimiter = tc.delimiter + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + e := args[1].(*entry.Entry) + require.Equal(t, tc.expectedRecord, e.Record) + }).Return(nil) + + err := parser.Process(context.Background(), input) + if tc.errorExpected { + require.Error(t, err) + return + } + require.NoError(t, err) + }) + } +} + +func TestKVParserWithEmbeddedTimeParser(t *testing.T) { + + testTime := time.Unix(1136214245, 0) + + cases := []struct { + name string + inputRecord map[string]interface{} + expectedRecord map[string]interface{} + errorExpected bool + preserveTo *entry.Field + }{ + { + "simple", + map[string]interface{}{ + "testfield": "timestamp=1136214245", + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{}, + }, + false, + nil, + }, + { + "preserve", + map[string]interface{}{ + "testfield": "timestamp=1136214245", + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{}, + "original_timestamp": "1136214245", + }, + false, + func() *entry.Field { + f := entry.NewRecordField("original_timestamp") + return &f + }(), + }, + { + "preserve-multi-fields", + map[string]interface{}{ + "testfield": "superkey=superval timestamp=1136214245", + }, + map[string]interface{}{ + "testparsed": map[string]interface{}{ + "superkey": "superval", + }, + }, + false, + nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + input := entry.New() + input.Record = tc.inputRecord + + output := entry.New() + output.Record = tc.expectedRecord + + parser, mockOutput := NewFakeKVOperator() + parser.delimiter = "=" + parseFrom := entry.NewRecordField("testparsed", "timestamp") + parser.ParserOperator.TimeParser = &helper.TimeParser{ + ParseFrom: &parseFrom, + LayoutType: "epoch", + Layout: "s", + PreserveTo: tc.preserveTo, + } + mockOutput.On("Process", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + e := args[1].(*entry.Entry) + require.Equal(t, tc.expectedRecord, e.Record) + require.Equal(t, testTime, e.Timestamp) + }).Return(nil) + + err := parser.Process(context.Background(), input) + require.NoError(t, err) + }) + } +} + +func TestSplitStringByWhitespace(t *testing.T) { + cases := []struct { + name string + intput string + output []string + }{ + { + "simple", + "k=v a=b x=\" y \" job=\"software engineering\"", + []string{ + "k=v", + "a=b", + "x=\" y \"", + "job=\"software engineering\"", + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.output, splitStringByWhitespace(tc.intput)) + }) + } +} + +func BenchmarkParse(b *testing.B) { + input := "name=stanza age=1 job=\"software engineering\" location=\"grand rapids michigan\" timestamp=1136214245 src=\"10.3.3.76\" dst=172.217.0.10 protocol=udp sport=57112 dport=443 translated_src_ip=96.63.176.3 translated_port=57112" + + kv := KVParser{ + delimiter: "=", + } + + timeParseFrom := entry.NewRecordField("timestamp") + kv.ParserOperator.TimeParser = &helper.TimeParser{ + ParseFrom: &timeParseFrom, + LayoutType: "epoch", + Layout: "s", + } + + for n := 0; n < b.N; n++ { + if _, err := kv.parse(input); err != nil { + b.Fatal(err) + } + } +}