Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Expose stream.* data in every event #17468

Merged
merged 10 commits into from
Apr 14, 2020
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@
- Introduced post install hooks {pull}17241[17241]
- Support for config constraints {pull}17112[17112]
- Display the stability of the agent at enroll and start. {pull}17336[17336]
- Expose stream.* variables in events {pull}17468[17468]
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/program/program_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -492,7 +493,7 @@ func TestConfiguration(t *testing.T) {
require.NoError(t, err)
var m map[string]interface{}
err = yamltest.FromYAML(programConfig, &m)
require.NoError(t, err)
require.NoError(t, errors.Wrap(err, program.Cmd()))

compareMap := &transpiler.MapVisitor{}
program.Config.Accept(compareMap)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ filebeat:
index: logs-generic-default
vars:
var: value
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@ metricbeat:
metricsets: [status]
index: metrics-docker.status-default
hosts: ["http://127.0.0.1:8080"]
processors:
- add_fields:
fields:
stream.type: metrics
stream.dataset: docker.status
stream.namespace: default
- module: apache
metricsets: [info]
index: metrics-generic-testing
hosts: ["http://apache.remote"]
processors:
- add_fields:
fields:
should_be: first
- add_fields:
fields:
stream.type: metrics
stream.dataset: generic
stream.namespace: testing
output:
elasticsearch:
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ datasources:
use_output: default
inputs:
- type: apache/metrics
processors:
- add_fields:
fields:
should_be: first
streams:
- enabled: true
metricset: info
Expand Down
124 changes: 124 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
)

const (
metaIndexKey = "_meta_index"
)

// RuleList is a container that allow the same tree to be executed on multiple defined Rule.
type RuleList struct {
Rules []Rule
Expand Down Expand Up @@ -69,6 +73,8 @@ func (r *RuleList) MarshalYAML() (interface{}, error) {
name = "extract_list_items"
case *InjectIndexRule:
name = "inject_index"
case *InjectStreamProcessorRule:
name = "inject_stream_processor"
case *MakeArrayRule:
name = "make_array"
case *RemoveKeyRule:
Expand Down Expand Up @@ -145,6 +151,8 @@ func (r *RuleList) UnmarshalYAML(unmarshal func(interface{}) error) error {
r = &ExtractListItemRule{}
case "inject_index":
r = &InjectIndexRule{}
case "inject_stream_processor":
r = &InjectStreamProcessorRule{}
case "make_array":
r = &MakeArrayRule{}
case "remove_key":
Expand Down Expand Up @@ -413,6 +421,17 @@ func (r *InjectIndexRule) Apply(ast *AST) error {
name: "index",
value: &StrVal{value: fmt.Sprintf("%s-%s-%s", r.Type, dataset, namespace)},
})
streamMap.value = append(streamMap.value, &Key{
name: metaIndexKey,
value: &Dict{
value: []Node{
&Key{name: "type", value: &StrVal{value: r.Type}},
&Key{name: "dataset", value: &StrVal{value: dataset}},
&Key{name: "namespace", value: &StrVal{value: namespace}},
},
},
})

}

}
Expand All @@ -429,6 +448,111 @@ func InjectIndex(indexType string) *InjectIndexRule {
}
}

// InjectStreamProcessorRule expect target to be a collection of fields including
// _meta_index map with dataset, index and namespace keys defined.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment need to be changed since we do not have the _meta_index present in the code?

// if any key is missing this rule fails.
type InjectStreamProcessorRule struct {
Target string
}

// Apply injects processor into input.
func (r *InjectStreamProcessorRule) Apply(ast *AST) error {
const defaultNamespace = "default"
const defaultDataset = "generic"

inputsNode, found := Lookup(ast, r.Target)
if !found {
return nil
}

inputsList, ok := inputsNode.Value().(*List)
if !ok {
return nil
}

for i := range inputsList.value {
inputNode := inputsList.value[i]
metaNode, ok := inputNode.Find(metaIndexKey)
ph marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
continue
}

streamType, found := metaNode.Find("type")
if !found {
return errors.New("InjectStreamProcessorRule: stream type not found")
}

streamNamespace, found := metaNode.Find("namespace")
if !found {
return errors.New("InjectStreamProcessorRule: stream namespace not found")
}

streamDataset, found := metaNode.Find("dataset")
if !found {
return errors.New("InjectStreamProcessorRule: stream dataset not found")
}

inputDict, ok := inputNode.(*Dict)
if !ok {
return errors.New("InjectStreamProcessorRule: input node is not a map")
}

// get processors node
processorsNode, found := inputNode.Find("processors")
if !found {
processorsNode = &Key{
name: "processors",
value: &List{value: make([]Node, 0)},
}

inputDict.value = append(inputDict.value, processorsNode)
}

processorsList, ok := processorsNode.Value().(*List)
if !ok {
return errors.New("InjectStreamProcessorRule: processors is not a list")
}

processorMap := &Dict{value: make([]Node, 0)}
processorMap.value = append(processorMap.value, &Key{name: "fields", value: &Dict{value: []Node{
&Key{name: "stream.type", value: &StrVal{value: streamType.Value().(Node).String()}},
&Key{name: "stream.namespace", value: &StrVal{value: streamNamespace.Value().(Node).String()}},
&Key{name: "stream.dataset", value: &StrVal{value: streamDataset.Value().(Node).String()}},
}}})

addFieldsMap := &Dict{value: []Node{&Key{"add_fields", processorMap}}}
processorsList.value = append(processorsList.value, addFieldsMap)

// clear meta
index := -1
for i, node := range inputDict.value {
key, ok := node.(*Key)
if !ok {
continue
}

if key.name == metaIndexKey {
index = i
break
}
}

// sanity check, but should always pass
if index != -1 {
inputDict.value = append(inputDict.value[:index], inputDict.value[index+1:]...)
}
}

return nil
}

// InjectStreamProcessor creates a InjectStreamProcessorRule
func InjectStreamProcessor(target string) *InjectStreamProcessorRule {
return &InjectStreamProcessorRule{
Target: target,
}
}

// ExtractListItemRule extract items with specified name from a list of maps.
// The result is store in a new array.
// Example:
Expand Down
19 changes: 19 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,20 @@ datasources:
streams:
- paths: /var/log/mysql/error.log
index: mytype-generic-default
_meta_index:
dataset: generic
namespace: default
type: mytype
- name: Specified namespace
namespace: nsns
inputs:
- type: file
streams:
- paths: /var/log/mysql/access.log
_meta_index:
dataset: generic
namespace: nsns
type: mytype
index: mytype-generic-nsns
- name: Specified dataset
inputs:
Expand All @@ -72,6 +80,10 @@ datasources:
- paths: /var/log/mysql/access.log
dataset: dsds
index: mytype-dsds-default
_meta_index:
dataset: dsds
namespace: default
type: mytype
- name: All specified
namespace: nsns
inputs:
Expand All @@ -80,6 +92,10 @@ datasources:
- paths: /var/log/mysql/access.log
dataset: dsds
index: mytype-dsds-nsns
_meta_index:
dataset: dsds
namespace: nsns
type: mytype
`,
rule: &RuleList{
Rules: []Rule{
Expand Down Expand Up @@ -561,6 +577,7 @@ func TestSerialization(t *testing.T) {
FilterValuesWithRegexp("inputs", "type", regexp.MustCompile("^metric/.*")),
ExtractListItem("path.p", "item", "target"),
InjectIndex("index-type"),
InjectStreamProcessor("target"),
CopyToList("t1", "t2", false),
CopyAllToList("t2", false, "a", "b"),
)
Expand Down Expand Up @@ -609,6 +626,8 @@ func TestSerialization(t *testing.T) {
to: target
- inject_index:
type: index-type
- inject_stream_processor:
to: target
- copy_to_list:
item: t1
to: t2
Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/spec/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ rules:
item: streams
to: inputs

- inject_stream_processor:
target: inputs

- map:
path: inputs
rules:
Expand Down
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/spec/metricbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ rules:
item: streams
to: inputs

- inject_stream_processor:
target: inputs

- filter_values_with_regexp:
key: type
Expand Down