Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Agent] Expose stream.* data in every event #17468

Merged
merged 10 commits into from
Apr 14, 2020
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@
- Introduced post install hooks {pull}17241[17241]
- Support for config constraints {pull}17112[17112]
- Display the stability of the agent at enroll and start. {pull}17336[17336]
- Expose stream.* variables in events {pull}17468[17468]
3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/program/program_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -492,7 +493,7 @@ func TestConfiguration(t *testing.T) {
require.NoError(t, err)
var m map[string]interface{}
err = yamltest.FromYAML(programConfig, &m)
require.NoError(t, err)
require.NoError(t, errors.Wrap(err, program.Cmd()))

compareMap := &transpiler.MapVisitor{}
program.Config.Accept(compareMap)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ filebeat:
index: logs-generic-default
vars:
var: value
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@ metricbeat:
metricsets: [status]
index: metrics-docker.status-default
hosts: ["http://127.0.0.1:8080"]
processors:
- add_fields:
fields:
stream.type: metrics
stream.dataset: docker.status
stream.namespace: default
- module: apache
metricsets: [info]
index: metrics-generic-testing
hosts: ["http://apache.remote"]
processors:
- add_fields:
fields:
should_be: first
- add_fields:
fields:
stream.type: metrics
stream.dataset: generic
stream.namespace: testing
output:
elasticsearch:
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ datasources:
use_output: default
inputs:
- type: apache/metrics
processors:
- add_fields:
fields:
should_be: first
streams:
- enabled: true
metricset: info
Expand Down
95 changes: 95 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/merge_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package transpiler

import "fmt"

type injector interface {
Inject(target []Node, source interface{}) []Node
InjectItem(target []Node, source Node) []Node
InjectCollection(target []Node, source []Node) []Node
}

func mergeStrategy(strategy string) injector {

switch strategy {
case "insert_before":
return injectBeforeInjector{}
case "insert_after":
return injectAfterInjector{}
case "replace":
return replaceInjector{}
case "noop":
return noopInjector{}
}

return injectAfterInjector{}
}

type noopInjector struct{}

func (i noopInjector) Inject(target []Node, source interface{}) []Node {
return inject(i, target, source)
}

func (noopInjector) InjectItem(target []Node, source Node) []Node { return target }

func (noopInjector) InjectCollection(target []Node, source []Node) []Node { return target }

type injectAfterInjector struct{}

func (i injectAfterInjector) Inject(target []Node, source interface{}) []Node {
return inject(i, target, source)
}

func (injectAfterInjector) InjectItem(target []Node, source Node) []Node {
return append(target, source)
}

func (injectAfterInjector) InjectCollection(target []Node, source []Node) []Node {
return append(target, source...)
}

type injectBeforeInjector struct{}

func (i injectBeforeInjector) Inject(target []Node, source interface{}) []Node {
return inject(i, target, source)
}

func (injectBeforeInjector) InjectItem(target []Node, source Node) []Node {
return append([]Node{source}, target...)
}

func (injectBeforeInjector) InjectCollection(target []Node, source []Node) []Node {
return append(source, target...)
}

type replaceInjector struct{}

func (i replaceInjector) Inject(target []Node, source interface{}) []Node {
return inject(i, target, source)
}

func (replaceInjector) InjectItem(target []Node, source Node) []Node {
return []Node{source}
}

func (replaceInjector) InjectCollection(target []Node, source []Node) []Node {
return source
}

func inject(i injector, target []Node, source interface{}) []Node {
if sourceCollection, ok := source.([]Node); ok {
fmt.Printf(">>[%T] list of nodes %T %d\n", i, source, len(sourceCollection))
return i.InjectCollection(target, sourceCollection)
}

if node, ok := source.(Node); ok {
fmt.Printf(">> one of nodes %T\n", source)
return i.InjectItem(target, node)
}

return target
}
Loading