From dd75a8bf1800939367027333205b45574498e8a1 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Wed, 15 Apr 2020 14:23:26 +0200 Subject: [PATCH] [Agent] Expose stream.* data in every event (#17468) (#17722) [Agent] Expose stream.* data in every event (#17468) (cherry picked from commit 70fba87abfd417eac734e3e736b54cbc5438ba04) --- x-pack/elastic-agent/CHANGELOG.asciidoc | 2 + .../pkg/agent/program/program_test.go | 3 +- .../pkg/agent/program/supported.go | 2 +- .../testdata/constraints_config-filebeat.yml | 6 + .../testdata/enabled_output_true-filebeat.yml | 6 + .../testdata/enabled_true-filebeat.yml | 6 + .../testdata/single_config-filebeat.yml | 6 + .../testdata/single_config-metricbeat.yml | 15 ++ .../agent/program/testdata/single_config.yml | 4 + .../pkg/agent/transpiler/merge_strategy.go | 95 +++++++++++ .../pkg/agent/transpiler/rules.go | 159 +++++++++++++++--- .../pkg/agent/transpiler/rules_test.go | 14 +- x-pack/elastic-agent/spec/filebeat.yml | 13 +- x-pack/elastic-agent/spec/metricbeat.yml | 13 +- 14 files changed, 311 insertions(+), 33 deletions(-) create mode 100644 x-pack/elastic-agent/pkg/agent/transpiler/merge_strategy.go diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 63190e42d5e..bf28d7b3a37 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -25,3 +25,5 @@ - Display the stability of the agent at enroll and start. {pull}17336[17336] - Support for config constraints {pull}17112[17112] - Introduced post install hooks {pull}17241[17241] +- Expose stream.* variables in events {pull}17468[17468] + diff --git a/x-pack/elastic-agent/pkg/agent/program/program_test.go b/x-pack/elastic-agent/pkg/agent/program/program_test.go index 85d471c6583..d1b5c091718 100644 --- a/x-pack/elastic-agent/pkg/agent/program/program_test.go +++ b/x-pack/elastic-agent/pkg/agent/program/program_test.go @@ -12,6 +12,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" yaml "gopkg.in/yaml.v2" @@ -492,7 +493,7 @@ func TestConfiguration(t *testing.T) { require.NoError(t, err) var m map[string]interface{} err = yamltest.FromYAML(programConfig, &m) - require.NoError(t, err) + require.NoError(t, errors.Wrap(err, program.Cmd())) compareMap := &transpiler.MapVisitor{} program.Config.Accept(compareMap) diff --git a/x-pack/elastic-agent/pkg/agent/program/supported.go b/x-pack/elastic-agent/pkg/agent/program/supported.go index 4044105b6cc..fe62d0afc28 100644 --- a/x-pack/elastic-agent/pkg/agent/program/supported.go +++ b/x-pack/elastic-agent/pkg/agent/program/supported.go @@ -19,7 +19,7 @@ func init() { // Packed Files // spec/filebeat.yml // spec/metricbeat.yml - unpacked := packer.MustUnpack("eJzsV02To7oV3edn9DapBES7E1I1C0M/BNhNj3E3EtpZkhuwJUw1+ANS+e8pCRsb98xLTb3tW0x5GqT7ce6951z+81BXa/aPj0Ks6XrV/L2V4uHfD1R6DXnbZbEUNUGhSPFilgJz+1o4coVOIpVJzqdVziTvXguHBoXpBcUxC8pIcD85zqWo6XIiqPQKCpPtd0Ry6kdCn7k/W8aCYqdOcSzmMtmnKKwJWthEejUD78XcnRbz9/6XIm+fIi4oSvbcnTQUxOI7zhoGvc2qNSWFieBuUAdu0MRL9Rs2KZrkBCQNQRPj1j73Q5MsQ+7K6EAlqYiVtPrd2y4jVnhMj1XL0UmwbjebL52KykqkVvyxQpMtwdmTW0yzwHWMNXbEa+HUFPDOzXZNAJNH7oe5Okuh6LhnVxwmjbITuNMs8J2cw+wpgHHOoddR5Rck3dmePvtaOBUtHZP7L+c78eH6LOq4H4oUmWd/Zs6e722LPfMTg1mRwWSSU3yxE5oUJt1rtsvU3/Ols6GWM8HAq6lnG9S06xWOjMv7G99Xn8V0eEctIljJ2xWOFQYyRaeOLG7uwuQxBcmRP+8y1IaDDV1/Kbb971ATXWMOhUFhX29dV+CVpK/lPkWmYJaTp+e+SIG9Z1bcpkjsuf/S38eRwMA2ue+Y3A35r9XkJ7hBYRBkPAX+PZ63dXmZuYWRUeQdL/gx6Bmr5122QpMjx/GlxkNf9TgZmYqNlpGegTH257j8eMLg+2x47jpNip0jwcHTbT0Ct+/Dvh/ZzXldC6BnAJCKwvenAJ4O5Hjjy51mBBNBy8WBWfF2hR51vgSK/Z0PwaWaP9tIramKbcOmP7bDUXx8LRyT+NO7WE4H0tpbCqJPlUMA40MKGsGysR0K7XJuhYJA0WErqqnFVV5Pgd8/+5o7OzBLdOqe6tk1jjQOfQ+QivqxYMIGKTqZBL9cZrgjitOs+MA2X2r1SfB2jL+vuCwZ1U/5O8fS3GHWBH6yveAzXzoKs+EegfZmBZL2enaUl7I1YUDXSXGQ4l6D4PCDS6/m6DrHDCQ1QZFBreDaw+farkAyeS2cgYvPed1hoHzFLUfvuodTYB/Xb+f8IGkpML7Y5fBfTxpTmBQEecYI481lriJBYbLh0G5v8SI3ujJgaCWGsj/EAxuxxiqnULgl3xHVj7/lOTPULJrdzE3+icDV/1ye4zNNe7acyrBwghRHrzecsQsHXmB14HJBoddxKDYMJDmT0S5sj1l47rewParYyhXw5Ar8Vs7daanqzKw4ZyArZ4vdt4e/9fIp181nwX4goG8oMZgUm7NgbihaPAXQFNwPqxSchRWHJesUQQ6i1REcm8ydVBQae4ImeSpPgjybPcEuRwJ3OatAOVLoGeT/Ca80cyq9kiBTDemeIntL3szHOVYg1Q1RvvDi94T3ah/rpinwotpQMJErxE2m3ymC4S2DLzMGbU24tIyMFJ3qvmkVUdqAJLZeKK6k5xxTHO9eC+XT3mqibCfbuRVNmBULupxMKDoWw+D7sVpChBKYQJoH4ic1wS974tpq+BVp79fIrOcyrhjoYw6yXxL2huC4XaHoT3H/g+KOFxpvsfaVYEcGBqeKKdyxWgAHMj7X/wd4jDG8f27Q7oa8sx8TJrZ4xWH+wWRSEpwPQn0mSV3zi0ir+cXF4+ccnOfVetn+rmD8UZGBA1Y/FRq9tPY4Nime/gSzn/aDwaS3Z+CUc5h8cCCMlWe3BHGx9qejReKyuHBfHEc190PVC+vZs7343oviX+dFXX3FqP+nfTzvsnC0wBgZ031hjpedi9B8XXZ0bS98cBUIM1+B5CPFYZvi7Sh+3VNv6k7PvwwMwnIjXpeY1YfIy21sjep9DBoxiKB7W6ebe1dhbwkyD1wmH7qXRnlpO9nA479yB3pGirWf2WWeL+dIGR5Ujne9dMbqVnf03Vb5p+j90jOSSbu54bexiI5EfuBVvRwo8eYwab8uH+Ftr43sXmt2FuArBp8Ei04tCKqvOcgF3eyyJfS6Zc8LuxA3dJRP/+GnextjowrcVPaLgcov6dRHA1lm5WWh0KIOkjrFkaF4nCCvTUFWznUMvCUorlirF4LPFJHPdKn/rxdpxeErl1Vu9u3bw3//8r8AAAD//7NUG+I=") + unpacked := packer.MustUnpack("eJzsV0uTo7oV3udnzDqVy6PdCanKwtAXAfbQ17hbEtohyQ3YEqbG2BhS+e8pCYMfPXNTk1Q2qSy63AXSeX7nfB9//3KoN+yXj1Js6CZr/tRJ8eWvX6j0G/K2zxMpDgRFIsWrRWqZu9fSlRk6i1TCgs/rgknev5YuDUvTD8s2D6tY8AC2SykOdD0TVPolBXD3GyIFDWKhzzyerRJBsXtIcSKWEh5TFB0IWjlE+gdmvZdLb14u34dfivxjirigCB65N2uolYjfcN4w4G+zzpQUQMG98BB6YZOs1W/UpGhWEAs2BM2MW/s8iEyyjrgn4xOVpCY27PS7t31O7KhN27rj6CxYv18s125NZS1SO/nI0GxHcP7slfM89Fxjg13xWroHavHey/dNCMQxk3DLfafnQSRSZH6wIDqlFuyZ5XSv+T4PvXlOrdlHajlHIs91aq+e1T1mwY77TkGqRLDLOR6IVsVEgVOxVsciNkHSpSg2sHWumYoJqxp+HWNqU5zsX0t3l+KkYJZjMhkL1g729Nm3fZ6hWctx0o950P76jNlJR5DfXPw1KZ4/2q5p5Zo8iMccRztTzZSv0DPy1HLaDXQKCs4fHDgfFIiev4zvr75vfObTO+AcsaX7c8hQbLyW7pFaTjv4Gv4ILrYEu4bGYRUbTMKC4q8aLxla6V+CZkUqz4JobEQtk86W4LindtSHwniM1aCmc8hwbIy9GmIRBkHGcxi4HbViwez4xKqv/24eNa1iwarkI5VQUjsSXv5Tvf1R/WsOYPNauo/P7/o74NQs2NiHwC04yAcMBtAYY2cVPJDJ97wJdWyzngLfIG93PRzj0vNw2x+K/JYB2N3VUj1XeO4HXN+eDz1XcKnm29G7Zpgttri9GwJo6Xm2Y4MAcdT5gqSmDz4IJoJWqxMHcatiS+35D+zAHXvZ5xwl7UMsyveJAGebWbBTM0Wt+BvB4YOd84l0TkdQUjPT6SlwbJXXa+kOz9rPuS/teMas84lozIhe12HAgMrbIDj64NI/cAT7sXbMggeCYoPa4adeZRac3dfftVJ0Nu/6p+K8nF+u72u2XLsmCebTLKiaXe8lp9Rqpp20XN/npWxtcKz75JVGTqQ4cAA7bKu9D6/7IIgFBXDLgdNdMXyZIdCIjdq946zOL/vvoQbLtWuwCooBw/GJBbtxNiSTTvPJbpCc7nnrtsbheLcn6r2dqJm+qRepKYAlQb5xfeaYPHBNPp/i+Uaw6FVOZFXbGYAKj2GK40jNIutysTGbq39vNs4u/1jv8qhM8zXw+zfFuThuUxSLhcdPHCctx6tq6c0rgs4Fs5M6tWOR4mibeewQenzoQccOKrbIagoimyLq2jxSuLJjI8XxPup2iy9/HGhebppvJfsO0b8haDApthdi31KkCMkUPIjq1LoIABzpgUXdRK49wYnJVELAOE6NezFlis49Wd8R8XjWIMhs9QL5VwJBmgWVfkWQqYb4SJGzI2/m0xK7RWodGqJ84dXvCYSrfZx0HL2XeFVvqTWTGeIm0+/en0PAOwa+Lhhweu47ajEbKTofBsAZOUWOReBlGU0LZlx4yqez06TXzXZ6qO1E0PVsRlFbTsskSJRYEvxlr/I6kUAv1iPxHDU4iliOG2QeljKpmTXEHOY/JUAagpMuQ/F/V4RccPJ/IfI/LUSaEMAnHkSFwpuuk+9oYXFd5A+CwZ7yXdzH+PB8WMbTgv4uWZiOneFkj+1IEAs+TSLlQhAai6sLIcr359B/Oi46Z8J/NP9dsvxPCXbaKT8k2UDvGp2bEj+TbeAb2csnvH/Cqao7rWL9QYRtXnNQfDAJK4KL9g4HYw+CZMbA+23/OzWruMx/eX856/3wW/n0bbH+XKPBjvKRP4decivemjCIxIUfbm2PJPtJ6HHwF8UZw25bTeTYpKgR2PI7Jv3Zd3E88owdC34Xl8bKFDOZanaJbdjL3wgeBcD8tk8391ZTbZmEDbWJwJbG0m1e2k4Ipg/En7iTFBz42o/quZ7d8RwgHbWMT1i61OqOX9Vdpj9QzYlnSBWd6M3evRcQdzOlPoZNCuBl/5OaBolg28/C6xZrd3avgmYUH9M5VVOmxJF8X3gV3xP09Bz+WhTMUD00+4UH/4ysu3yOEy+aprNYz2U0iKJXlR+zFYe/7yPrIqYGQSMo8HsOxJZZsGBSiZdWxVAxCXcZ/qrFUGb5MrN+1f/rjwhbcUteLVb7v335xx/+GQAA//+LKLK5") SupportedMap = make(map[string]bool) for f, v := range unpacked { diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/constraints_config-filebeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/constraints_config-filebeat.yml index c61497d3404..c19f9c36629 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/constraints_config-filebeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/constraints_config-filebeat.yml @@ -5,6 +5,12 @@ filebeat: - /var/log/hello1.log - /var/log/hello2.log index: logs-generic-default + processors: + - add_fields: + fields: + stream.type: logs + stream.dataset: generic + stream.namespace: default output: elasticsearch: hosts: diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml index 2dcc22e14b8..413a6866e91 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_output_true-filebeat.yml @@ -5,6 +5,12 @@ filebeat: - /var/log/hello1.log - /var/log/hello2.log index: logs-generic-default + processors: + - add_fields: + fields: + stream.type: logs + stream.dataset: generic + stream.namespace: default output: elasticsearch: enabled: true diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml index 141e9d95d6d..a31fe4e37dd 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/enabled_true-filebeat.yml @@ -6,6 +6,12 @@ filebeat: - /var/log/hello1.log - /var/log/hello2.log index: logs-generic-default + processors: + - add_fields: + fields: + stream.type: logs + stream.dataset: generic + stream.namespace: default output: elasticsearch: hosts: diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml index dd26c83681a..39c54159d10 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-filebeat.yml @@ -7,6 +7,12 @@ filebeat: index: logs-generic-default vars: var: value + processors: + - add_fields: + fields: + stream.type: logs + stream.dataset: generic + stream.namespace: default output: elasticsearch: hosts: diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml index 7fc9f33e6b0..056233819f3 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config-metricbeat.yml @@ -4,10 +4,25 @@ metricbeat: metricsets: [status] index: metrics-docker.status-default hosts: ["http://127.0.0.1:8080"] + processors: + - add_fields: + fields: + stream.type: metrics + stream.dataset: docker.status + stream.namespace: default - module: apache metricsets: [info] index: metrics-generic-testing hosts: ["http://apache.remote"] + processors: + - add_fields: + fields: + should_be: first + - add_fields: + fields: + stream.type: metrics + stream.dataset: generic + stream.namespace: testing output: elasticsearch: hosts: [127.0.0.1:9200, 127.0.0.1:9300] diff --git a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config.yml b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config.yml index b986a7b686b..bcf158e74d6 100644 --- a/x-pack/elastic-agent/pkg/agent/program/testdata/single_config.yml +++ b/x-pack/elastic-agent/pkg/agent/program/testdata/single_config.yml @@ -38,6 +38,10 @@ datasources: use_output: default inputs: - type: apache/metrics + processors: + - add_fields: + fields: + should_be: first streams: - enabled: true metricset: info diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/merge_strategy.go b/x-pack/elastic-agent/pkg/agent/transpiler/merge_strategy.go new file mode 100644 index 00000000000..da96b5c11d2 --- /dev/null +++ b/x-pack/elastic-agent/pkg/agent/transpiler/merge_strategy.go @@ -0,0 +1,95 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package transpiler + +import "fmt" + +type injector interface { + Inject(target []Node, source interface{}) []Node + InjectItem(target []Node, source Node) []Node + InjectCollection(target []Node, source []Node) []Node +} + +func mergeStrategy(strategy string) injector { + + switch strategy { + case "insert_before": + return injectBeforeInjector{} + case "insert_after": + return injectAfterInjector{} + case "replace": + return replaceInjector{} + case "noop": + return noopInjector{} + } + + return injectAfterInjector{} +} + +type noopInjector struct{} + +func (i noopInjector) Inject(target []Node, source interface{}) []Node { + return inject(i, target, source) +} + +func (noopInjector) InjectItem(target []Node, source Node) []Node { return target } + +func (noopInjector) InjectCollection(target []Node, source []Node) []Node { return target } + +type injectAfterInjector struct{} + +func (i injectAfterInjector) Inject(target []Node, source interface{}) []Node { + return inject(i, target, source) +} + +func (injectAfterInjector) InjectItem(target []Node, source Node) []Node { + return append(target, source) +} + +func (injectAfterInjector) InjectCollection(target []Node, source []Node) []Node { + return append(target, source...) +} + +type injectBeforeInjector struct{} + +func (i injectBeforeInjector) Inject(target []Node, source interface{}) []Node { + return inject(i, target, source) +} + +func (injectBeforeInjector) InjectItem(target []Node, source Node) []Node { + return append([]Node{source}, target...) +} + +func (injectBeforeInjector) InjectCollection(target []Node, source []Node) []Node { + return append(source, target...) +} + +type replaceInjector struct{} + +func (i replaceInjector) Inject(target []Node, source interface{}) []Node { + return inject(i, target, source) +} + +func (replaceInjector) InjectItem(target []Node, source Node) []Node { + return []Node{source} +} + +func (replaceInjector) InjectCollection(target []Node, source []Node) []Node { + return source +} + +func inject(i injector, target []Node, source interface{}) []Node { + if sourceCollection, ok := source.([]Node); ok { + fmt.Printf(">>[%T] list of nodes %T %d\n", i, source, len(sourceCollection)) + return i.InjectCollection(target, sourceCollection) + } + + if node, ok := source.(Node); ok { + fmt.Printf(">> one of nodes %T\n", source) + return i.InjectItem(target, node) + } + + return target +} diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/rules.go b/x-pack/elastic-agent/pkg/agent/transpiler/rules.go index 50eddecf3ab..286c162bc0d 100644 --- a/x-pack/elastic-agent/pkg/agent/transpiler/rules.go +++ b/x-pack/elastic-agent/pkg/agent/transpiler/rules.go @@ -69,6 +69,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) { name = "extract_list_items" case *InjectIndexRule: name = "inject_index" + case *InjectStreamProcessorRule: + name = "inject_stream_processor" case *MakeArrayRule: name = "make_array" case *RemoveKeyRule: @@ -145,6 +147,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error { r = &ExtractListItemRule{} case "inject_index": r = &InjectIndexRule{} + case "inject_stream_processor": + r = &InjectStreamProcessorRule{} case "make_array": r = &MakeArrayRule{} case "remove_key": @@ -235,9 +239,9 @@ func MakeArray(item Selector, to string) *MakeArrayRule { // CopyToListRule is a rule which copies a specified // node into every item in a provided list. type CopyToListRule struct { - Item Selector - To string - Overwrite bool + Item Selector + To string + OnConflict string `yaml:"on_conflict" config:"on_conflict"` } // Apply copies specified node into every item of the list. @@ -265,13 +269,18 @@ func (r *CopyToListRule) Apply(ast *AST) error { continue } - if !r.Overwrite { - _, found := listItemMap.Find(r.Item) - if found { - continue + if existingNode, found := listItemMap.Find(r.Item); found { + sourceNodeItemsList := sourceNode.Clone().Value().(Node) // key.value == node + if existingList, ok := existingNode.Value().(*List); ok { + existingList.value = mergeStrategy(r.OnConflict).Inject(existingList.Clone().Value().([]Node), sourceNodeItemsList.Value()) + } else if existingMap, ok := existingNode.Value().(*Dict); ok { + existingMap.value = mergeStrategy(r.OnConflict).Inject(existingMap.Clone().Value().([]Node), sourceNodeItemsList.Value()) } + + continue } + // if not conflicting move entire node listItemMap.value = append(listItemMap.value, sourceNode.Clone()) } @@ -279,20 +288,20 @@ func (r *CopyToListRule) Apply(ast *AST) error { } // CopyToList creates a CopyToListRule -func CopyToList(item Selector, to string, overwrite bool) *CopyToListRule { +func CopyToList(item Selector, to, onMerge string) *CopyToListRule { return &CopyToListRule{ - Item: item, - To: to, - Overwrite: overwrite, + Item: item, + To: to, + OnConflict: onMerge, } } // CopyAllToListRule is a rule which copies a all nodes // into every item in a provided list. type CopyAllToListRule struct { - To string - Except []string - Overwrite bool + To string + Except []string + OnConflict string `yaml:"on_conflict" config:"on_conflict"` } // Apply copies all nodes into every item of the list. @@ -319,7 +328,7 @@ func (r *CopyAllToListRule) Apply(ast *AST) error { continue } - if err := CopyToList(item, r.To, r.Overwrite).Apply(ast); err != nil { + if err := CopyToList(item, r.To, r.OnConflict).Apply(ast); err != nil { return err } } @@ -328,11 +337,11 @@ func (r *CopyAllToListRule) Apply(ast *AST) error { } // CopyAllToList creates a CopyAllToListRule -func CopyAllToList(to string, overwrite bool, except ...string) *CopyAllToListRule { +func CopyAllToList(to, onMerge string, except ...string) *CopyAllToListRule { return &CopyAllToListRule{ - To: to, - Except: except, - Overwrite: overwrite, + To: to, + Except: except, + OnConflict: onMerge, } } @@ -414,9 +423,7 @@ func (r *InjectIndexRule) Apply(ast *AST) error { value: &StrVal{value: fmt.Sprintf("%s-%s-%s", r.Type, dataset, namespace)}, }) } - } - } return nil @@ -429,6 +436,116 @@ func InjectIndex(indexType string) *InjectIndexRule { } } +// InjectStreamProcessorRule injects a add fields processor providing +// stream type, namespace and dataset fields into events. +type InjectStreamProcessorRule struct { + Type string + OnConflict string `yaml:"on_conflict" config:"on_conflict"` +} + +// Apply injects processor into input. +func (r *InjectStreamProcessorRule) Apply(ast *AST) error { + const defaultNamespace = "default" + const defaultDataset = "generic" + + datasourcesNode, found := Lookup(ast, "datasources") + if !found { + return nil + } + + datasourcesList, ok := datasourcesNode.Value().(*List) + if !ok { + return nil + } + + for _, datasourceNode := range datasourcesList.value { + namespace := defaultNamespace + nsNode, found := datasourceNode.Find("namespace") + if found { + nsKey, ok := nsNode.(*Key) + if ok { + namespace = nsKey.value.String() + } + } + + // get input + inputNode, found := datasourceNode.Find("inputs") + if !found { + continue + } + + inputsList, ok := inputNode.Value().(*List) + if !ok { + continue + } + + for _, inputNode := range inputsList.value { + streamsNode, ok := inputNode.Find("streams") + if !ok { + continue + } + + streamsList, ok := streamsNode.Value().(*List) + if !ok { + continue + } + + for _, streamNode := range streamsList.value { + streamMap, ok := streamNode.(*Dict) + if !ok { + continue + } + + dataset := defaultDataset + + dsNode, found := streamNode.Find("dataset") + if found { + dsKey, ok := dsNode.(*Key) + if ok { + dataset = dsKey.value.String() + } + } + + // get processors node + processorsNode, found := streamNode.Find("processors") + if !found { + processorsNode = &Key{ + name: "processors", + value: &List{value: make([]Node, 0)}, + } + + streamMap.value = append(streamMap.value, processorsNode) + } + + processorsList, ok := processorsNode.Value().(*List) + if !ok { + return errors.New("InjectStreamProcessorRule: processors is not a list") + } + + processorMap := &Dict{value: make([]Node, 0)} + processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{ + &Key{name: "stream.type", value: &StrVal{value: r.Type}}, + &Key{name: "stream.namespace", value: &StrVal{value: namespace}}, + &Key{name: "stream.dataset", value: &StrVal{value: dataset}}, + }}}) + + addFieldsMap := &Dict{value: []Node{&Key{"add_fields", processorMap}}} + processorsList.value = mergeStrategy(r.OnConflict).InjectItem(processorsList.value, addFieldsMap) + } + } + } + + return nil +} + +// InjectStreamProcessor creates a InjectStreamProcessorRule +func InjectStreamProcessor(onMerge, streamType string) *InjectStreamProcessorRule { + return &InjectStreamProcessorRule{ + OnConflict: onMerge, + Type: streamType, + } +} + // ExtractListItemRule extract items with specified name from a list of maps. // The result is store in a new array. // Example: diff --git a/x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go b/x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go index 36463cfabc0..2fc9fedd20f 100644 --- a/x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go +++ b/x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go @@ -481,7 +481,7 @@ inputs: `, rule: &RuleList{ Rules: []Rule{ - CopyToList("namespace", "inputs", false), + CopyToList("namespace", "inputs", "insert_after"), }, }, }, @@ -561,8 +561,9 @@ func TestSerialization(t *testing.T) { FilterValuesWithRegexp("inputs", "type", regexp.MustCompile("^metric/.*")), ExtractListItem("path.p", "item", "target"), InjectIndex("index-type"), - CopyToList("t1", "t2", false), - CopyAllToList("t2", false, "a", "b"), + InjectStreamProcessor("insert_after", "index-type"), + CopyToList("t1", "t2", "insert_after"), + CopyAllToList("t2", "insert_before", "a", "b"), ) y := `- rename: @@ -609,16 +610,19 @@ func TestSerialization(t *testing.T) { to: target - inject_index: type: index-type +- inject_stream_processor: + type: index-type + on_conflict: insert_after - copy_to_list: item: t1 to: t2 - overwrite: false + on_conflict: insert_after - copy_all_to_list: to: t2 except: - a - b - overwrite: false + on_conflict: insert_before ` t.Run("serialize_rules", func(t *testing.T) { diff --git a/x-pack/elastic-agent/spec/filebeat.yml b/x-pack/elastic-agent/spec/filebeat.yml index a5cda0c2382..232ac9b12bb 100644 --- a/x-pack/elastic-agent/spec/filebeat.yml +++ b/x-pack/elastic-agent/spec/filebeat.yml @@ -6,6 +6,10 @@ rules: - inject_index: type: logs +- inject_stream_processor: + on_conflict: insert_after + type: logs + - extract_list_items: path: datasources item: inputs @@ -16,8 +20,13 @@ rules: rules: - copy_all_to_list: to: streams - overwrite: false - except: ["streams", "id", "enabled", "title", "package", "namespace", "constraints", "use_output"] + on_conflict: noop + except: ["streams", "id", "enabled", "processors"] + - copy_to_list: + item: processors + to: streams + on_conflict: insert_before + - extract_list_items: path: inputsstreams diff --git a/x-pack/elastic-agent/spec/metricbeat.yml b/x-pack/elastic-agent/spec/metricbeat.yml index 515b23f3035..b9085c8fbb6 100644 --- a/x-pack/elastic-agent/spec/metricbeat.yml +++ b/x-pack/elastic-agent/spec/metricbeat.yml @@ -10,6 +10,10 @@ rules: - inject_index: type: metrics +- inject_stream_processor: + on_conflict: insert_after + type: metrics + - extract_list_items: path: datasources item: inputs @@ -20,15 +24,18 @@ rules: rules: - copy_all_to_list: to: streams - overwrite: false - except: ["streams", "id", "enabled"] + on_conflict: noop + except: ["streams", "id", "enabled", "processors"] + - copy_to_list: + item: processors + to: streams + on_conflict: insert_before - extract_list_items: path: inputsstreams item: streams to: inputs - - filter_values_with_regexp: key: type re: ^.+/metrics$