Skip to content

Commit

Permalink
Add support for Logstash as a valid Agent destination (elastic#24305)
Browse files Browse the repository at this point in the history
* Add support for Logstash as a valid Agent destination

This PR fixes an issue to use an logstash output as a monitoring
destination, this correctly uses the type defined in the output to
generate the appropriate output configuration for the monitoring
processes.
  • Loading branch information
ph authored Jun 23, 2021
1 parent bd62d2f commit 92319f5
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 68 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
- Agent sends wrong log level to Endpoint {issue}25583[25583]
- Fix startup with failing configuration {pull}26057[26057]
- Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391]
- Fix add support for Logstash output. {pull}24305[24305]

==== New features

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,18 @@ func InjectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *tr
}

// get monitoring output name to be used
monitoringOutputName := defaultOutputName
useOutputNode, found := transpiler.Lookup(rootAst, monitoringUseOutputKey)
if found {
monitoringOutputNameKey, ok := useOutputNode.Value().(*transpiler.StrVal)
if !ok {
return programsToRun, nil
}

monitoringOutputName = monitoringOutputNameKey.String()
monitoringOutputName, found := transpiler.LookupString(rootAst, monitoringUseOutputKey)
if !found {
monitoringOutputName = defaultOutputName
}

typeValue, found := transpiler.LookupString(rootAst, fmt.Sprintf("%s.%s.type", outputsKey, monitoringOutputName))
if !found {
typeValue = elasticsearchKey
}

ast := rootAst.Clone()
if err := getMonitoringRule(monitoringOutputName).Apply(agentInfo, ast); err != nil {
if err := getMonitoringRule(monitoringOutputName, typeValue).Apply(agentInfo, ast); err != nil {
return programsToRun, err
}

Expand Down Expand Up @@ -95,11 +94,11 @@ func InjectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *tr
return append(programsToRun, monitoringProgram), nil
}

func getMonitoringRule(outputName string) *transpiler.RuleList {
func getMonitoringRule(outputName string, t string) *transpiler.RuleList {
monitoringOutputSelector := fmt.Sprintf(monitoringOutputFormatKey, outputName)
return transpiler.NewRuleList(
transpiler.Copy(monitoringOutputSelector, outputKey),
transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), elasticsearchKey),
transpiler.Rename(fmt.Sprintf("%s.%s", outputsKey, outputName), t),
transpiler.Filter(monitoringKey, programsKey, outputKey),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,85 @@ GROUPLOOP:
}
}

func TestMonitoringToLogstashInjection(t *testing.T) {
agentInfo, err := info.NewAgentInfo(true)
if err != nil {
t.Fatal(err)
}
ast, err := transpiler.NewAST(inputConfigLS)
if err != nil {
t.Fatal(err)
}

programsToRun, err := program.Programs(agentInfo, ast)
if err != nil {
t.Fatal(err)
}

if len(programsToRun) != 1 {
t.Fatal(fmt.Errorf("programsToRun expected to have %d entries", 1))
}

GROUPLOOP:
for group, ptr := range programsToRun {
programsCount := len(ptr)
newPtr, err := InjectMonitoring(agentInfo, group, ast, ptr)
if err != nil {
t.Error(err)
continue GROUPLOOP
}

if programsCount+1 != len(newPtr) {
t.Errorf("incorrect programs to run count, expected: %d, got %d", programsCount+1, len(newPtr))
continue GROUPLOOP
}

for _, p := range newPtr {
if p.Spec.Name != MonitoringName {
continue
}

cm, err := p.Config.Map()
if err != nil {
t.Error(err)
continue GROUPLOOP
}

outputCfg, found := cm[outputKey]
if !found {
t.Errorf("output not found for '%s'", group)
continue GROUPLOOP
}

outputMap, ok := outputCfg.(map[string]interface{})
if !ok {
t.Errorf("output is not a map for '%s'", group)
continue GROUPLOOP
}

esCfg, found := outputMap["logstash"]
if !found {
t.Errorf("logstash output not found for '%s' %v", group, outputMap)
continue GROUPLOOP
}

esMap, ok := esCfg.(map[string]interface{})
if !ok {
t.Errorf("output.logstash is not a map for '%s'", group)
continue GROUPLOOP
}

if uname, found := esMap["hosts"]; !found {
t.Errorf("output.logstash.hosts output not found for '%s'", group)
continue GROUPLOOP
} else if uname != "192.168.1.2" {
t.Errorf("output.logstash.hosts has incorrect value expected '%s', got '%s for %s", "monitoring-uname", uname, group)
continue GROUPLOOP
}
}
}
}

func TestMonitoringInjectionDisabled(t *testing.T) {
agentInfo, err := info.NewAgentInfo(true)
if err != nil {
Expand Down Expand Up @@ -613,42 +692,40 @@ var inputChange2 = map[string]interface{}{
},
}

// const inputConfig = `outputs:
// default:
// index_name: general
// pass: xxx
// type: es
// url: xxxxx
// username: xxx
// infosec1:
// pass: xxx
// spool:
// file: "${path.data}/spool.dat"
// type: es
// url: xxxxx
// username: xxx
// streams:
// -
// output:
// override:
// index_name: my_service_logs
// ingest_pipeline: process_logs
// path: /xxxx
// processors:
// -
// dissect:
// tokenizer: "---"
// type: log
// -
// output:
// index_name: mysql_access_logs
// path: /xxxx
// type: log
// -
// output:
// index_name: mysql_metrics
// use_output: infosec1
// pass: yyy
// type: metrics/system
// username: xxxx
// `
var inputConfigLS = map[string]interface{}{
"agent.monitoring": map[string]interface{}{
"enabled": true,
"logs": true,
"metrics": true,
"use_output": "monitoring",
},
"outputs": map[string]interface{}{
"default": map[string]interface{}{
"index_name": "general",
"pass": "xxx",
"type": "elasticsearch",
"url": "xxxxx",
"username": "xxx",
},
"monitoring": map[string]interface{}{
"type": "logstash",
"hosts": "192.168.1.2",
"ssl.certificate_authorities": []string{"/etc/pki.key"},
},
},
"inputs": []map[string]interface{}{
{
"type": "log",
"streams": []map[string]interface{}{
{"paths": "/xxxx"},
},
"processors": []interface{}{
map[string]interface{}{
"dissect": map[string]interface{}{
"tokenizer": "---",
},
},
},
},
},
}
58 changes: 46 additions & 12 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
}

func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) {
for _, step := range o.generateMonitoringSteps(s.Version, nil) {
for _, step := range o.generateMonitoringSteps(s.Version, "", nil) {
p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags())
if err != nil {
return errors.New(err,
Expand Down Expand Up @@ -115,24 +115,57 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.S
return nil
}

output, found := outputMap["elasticsearch"]
if !found {
o.logger.Error("operator.getMonitoringSteps: monitoring is missing an elasticsearch output configuration configuration for sidecar of type: %s", step.ProgramSpec.Cmd)
if len(outputMap) == 0 {
o.logger.Errorf("operator.getMonitoringSteps: monitoring is missing an output configuration for sidecar of type: %s", step.ProgramSpec.Cmd)
return nil
}

// Guards against parser issues upstream, this should not be possible but
// since we are folding all the child options as a map we should make sure we have
//a unique output.
if len(outputMap) > 1 {
o.logger.Errorf("operator.getMonitoringSteps: monitoring has too many outputs configuration for sidecar of type: %s", step.ProgramSpec.Cmd)
return nil
}

// Aggregate output configuration independently of the received output key.
output := make(map[string]interface{})

for _, v := range outputMap {
child, ok := v.(map[string]interface{})
if !ok {
o.logger.Error("operator.getMonitoringSteps: monitoring config is not a map")
return nil
}
for c, j := range child {
output[c] = j
}
}

t, ok := output["type"]
if !ok {
o.logger.Errorf("operator.getMonitoringSteps: unknown monitoring output for sidecar of type: %s", step.ProgramSpec.Cmd)
return nil
}

return o.generateMonitoringSteps(step.Version, output)
outputType, ok := t.(string)
if !ok {
o.logger.Errorf("operator.getMonitoringSteps: unexpected monitoring output type: %+v for sidecar of type: %s", t, step.ProgramSpec.Cmd)
return nil
}

return o.generateMonitoringSteps(step.Version, outputType, output)
}

func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step {
func (o *Operator) generateMonitoringSteps(version, outputType string, output interface{}) []configrequest.Step {
var steps []configrequest.Step
watchLogs := o.monitor.WatchLogs()
watchMetrics := o.monitor.WatchMetrics()

// generate only when monitoring is running (for config refresh) or
// state changes (turning on/off)
if watchLogs != o.isMonitoringLogs() || watchLogs {
fbConfig, any := o.getMonitoringFilebeatConfig(output)
fbConfig, any := o.getMonitoringFilebeatConfig(outputType, output)
stepID := configrequest.StepRun
if !watchLogs || !any {
stepID = configrequest.StepRemove
Expand All @@ -149,7 +182,7 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [
steps = append(steps, filebeatStep)
}
if watchMetrics != o.isMonitoringMetrics() || watchMetrics {
mbConfig, any := o.getMonitoringMetricbeatConfig(output)
mbConfig, any := o.getMonitoringMetricbeatConfig(outputType, output)
stepID := configrequest.StepRun
if !watchMetrics || !any {
stepID = configrequest.StepRemove
Expand Down Expand Up @@ -182,7 +215,7 @@ func loadSpecFromSupported(processName string) program.Spec {
}
}

func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]interface{}, bool) {
func (o *Operator) getMonitoringFilebeatConfig(outputType string, output interface{}) (map[string]interface{}, bool) {
inputs := []interface{}{
map[string]interface{}{
"type": "filestream",
Expand Down Expand Up @@ -297,12 +330,13 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
})
}
}

result := map[string]interface{}{
"filebeat": map[string]interface{}{
"inputs": inputs,
},
"output": map[string]interface{}{
"elasticsearch": output,
outputType: output,
},
}

Expand All @@ -311,7 +345,7 @@ func (o *Operator) getMonitoringFilebeatConfig(output interface{}) (map[string]i
return result, true
}

func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string]interface{}, bool) {
func (o *Operator) getMonitoringMetricbeatConfig(outputType string, output interface{}) (map[string]interface{}, bool) {
hosts := o.getMetricbeatEndpoints()
if len(hosts) == 0 {
return nil, false
Expand Down Expand Up @@ -526,7 +560,7 @@ func (o *Operator) getMonitoringMetricbeatConfig(output interface{}) (map[string
"modules": modules,
},
"output": map[string]interface{}{
"elasticsearch": output,
outputType: output,
},
}

Expand Down
11 changes: 6 additions & 5 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

func TestGenerateSteps(t *testing.T) {
const sampleOutput = "sample-output"
const outputType = "logstash"

type testCase struct {
Name string
Expand All @@ -51,7 +52,7 @@ func TestGenerateSteps(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
m := &testMonitor{monitorLogs: tc.Config.MonitorLogs, monitorMetrics: tc.Config.MonitorMetrics}
operator := getMonitorableTestOperator(t, "tests/scripts", m, tc.Config)
steps := operator.generateMonitoringSteps("8.0", sampleOutput)
steps := operator.generateMonitoringSteps("8.0", outputType, sampleOutput)
if actualSteps := len(steps); actualSteps != tc.ExpectedSteps {
t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps)
}
Expand All @@ -61,13 +62,13 @@ func TestGenerateSteps(t *testing.T) {
// Filebeat step check
if s.ProgramSpec.Cmd == "filebeat" {
fbFound = true
checkStep(t, "filebeat", sampleOutput, s)
checkStep(t, "filebeat", outputType, sampleOutput, s)
}

// Metricbeat step check
if s.ProgramSpec.Cmd == "metricbeat" {
mbFound = true
checkStep(t, "metricbeat", sampleOutput, s)
checkStep(t, "metricbeat", outputType, sampleOutput, s)
}
}

Expand All @@ -82,7 +83,7 @@ func TestGenerateSteps(t *testing.T) {
}
}

func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s configrequest.Step) {
func checkStep(t *testing.T, stepName string, outputType string, expectedOutput interface{}, s configrequest.Step) {
if meta := s.Meta[configrequest.MetaConfigKey]; meta != nil {
mapstr, ok := meta.(map[string]interface{})
if !ok {
Expand All @@ -94,7 +95,7 @@ func checkStep(t *testing.T, stepName string, expectedOutput interface{}, s conf
t.Fatalf("output not found for %s step", stepName)
}

if actualOutput := esOut["elasticsearch"]; actualOutput != expectedOutput {
if actualOutput := esOut[outputType]; actualOutput != expectedOutput {
t.Fatalf("output for %s step does not match. expected: %v, got %v", stepName, expectedOutput, actualOutput)
}
}
Expand Down
Loading

0 comments on commit 92319f5

Please sign in to comment.