Skip to content

Commit

Permalink
[Agent] Expose stream.* data in every event (elastic#17468)
Browse files Browse the repository at this point in the history
[Agent] Expose stream.* data in every event (elastic#17468)

(cherry picked from commit 70fba87)
  • Loading branch information
michalpristas committed Apr 15, 2020
1 parent 4de9e9c commit de077ee
Show file tree
Hide file tree
Showing 14 changed files with 311 additions and 33 deletions.
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,5 @@
- Display the stability of the agent at enroll and start. {pull}17336[17336]
- Support for config constraints {pull}17112[17112]
- Introduced post install hooks {pull}17241[17241]
- Expose stream.* variables in events {pull}17468[17468]

3 changes: 2 additions & 1 deletion x-pack/elastic-agent/pkg/agent/program/program_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v2"
Expand Down Expand Up @@ -492,7 +493,7 @@ func TestConfiguration(t *testing.T) {
require.NoError(t, err)
var m map[string]interface{}
err = yamltest.FromYAML(programConfig, &m)
require.NoError(t, err)
require.NoError(t, errors.Wrap(err, program.Cmd()))

compareMap := &transpiler.MapVisitor{}
program.Config.Accept(compareMap)
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ filebeat:
- /var/log/hello1.log
- /var/log/hello2.log
index: logs-generic-default
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ filebeat:
index: logs-generic-default
vars:
var: value
processors:
- add_fields:
fields:
stream.type: logs
stream.dataset: generic
stream.namespace: default
output:
elasticsearch:
hosts:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,25 @@ metricbeat:
metricsets: [status]
index: metrics-docker.status-default
hosts: ["http://127.0.0.1:8080"]
processors:
- add_fields:
fields:
stream.type: metrics
stream.dataset: docker.status
stream.namespace: default
- module: apache
metricsets: [info]
index: metrics-generic-testing
hosts: ["http://apache.remote"]
processors:
- add_fields:
fields:
should_be: first
- add_fields:
fields:
stream.type: metrics
stream.dataset: generic
stream.namespace: testing
output:
elasticsearch:
hosts: [127.0.0.1:9200, 127.0.0.1:9300]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ datasources:
use_output: default
inputs:
- type: apache/metrics
processors:
- add_fields:
fields:
should_be: first
streams:
- enabled: true
metricset: info
Expand Down
95 changes: 95 additions & 0 deletions x-pack/elastic-agent/pkg/agent/transpiler/merge_strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package transpiler

import "fmt"

type injector interface {
Inject(target []Node, source interface{}) []Node
InjectItem(target []Node, source Node) []Node
InjectCollection(target []Node, source []Node) []Node
}

func mergeStrategy(strategy string) injector {

switch strategy {
case "insert_before":
return injectBeforeInjector{}
case "insert_after":
return injectAfterInjector{}
case "replace":
return replaceInjector{}
case "noop":
return noopInjector{}
}

return injectAfterInjector{}
}

type noopInjector struct{}

func (i noopInjector) Inject(target []Node, source interface{}) []Node {
return inject(i, target, source)
}

func (noopInjector) InjectItem(target []Node, source Node) []Node { return target }

func (noopInjector) InjectCollection(target []Node, source []Node) []Node { return target }

type injectAfterInjector struct{}

func (i injectAfterInjector) Inject(target []Node, source interface{}) []Node {
return inject(i, target, source)
}

func (injectAfterInjector) InjectItem(target []Node, source Node) []Node {
return append(target, source)
}

func (injectAfterInjector) InjectCollection(target []Node, source []Node) []Node {
return append(target, source...)
}

type injectBeforeInjector struct{}

func (i injectBeforeInjector) Inject(target []Node, source interface{}) []Node {
return inject(i, target, source)
}

func (injectBeforeInjector) InjectItem(target []Node, source Node) []Node {
return append([]Node{source}, target...)
}

func (injectBeforeInjector) InjectCollection(target []Node, source []Node) []Node {
return append(source, target...)
}

type replaceInjector struct{}

func (i replaceInjector) Inject(target []Node, source interface{}) []Node {
return inject(i, target, source)
}

func (replaceInjector) InjectItem(target []Node, source Node) []Node {
return []Node{source}
}

func (replaceInjector) InjectCollection(target []Node, source []Node) []Node {
return source
}

func inject(i injector, target []Node, source interface{}) []Node {
if sourceCollection, ok := source.([]Node); ok {
fmt.Printf(">>[%T] list of nodes %T %d\n", i, source, len(sourceCollection))
return i.InjectCollection(target, sourceCollection)
}

if node, ok := source.(Node); ok {
fmt.Printf(">> one of nodes %T\n", source)
return i.InjectItem(target, node)
}

return target
}
Loading

0 comments on commit de077ee

Please sign in to comment.