diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index d3fd1fa65d5d..619850f4bc82 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -28,6 +28,7 @@ - Fix Windows service installation script {pull}20203[20203] - Fix timeout issue stopping service applications {pull}20256[20256] - Fix incorrect hash when upgrading agent {pull}22322[22322] +- Fix refresh of monitoring configuration {pull}23619[23619] - Fixed nil pointer during unenroll {pull}23609[23609] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator.go b/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator.go index 920b1a4b5bff..ab9ff6bbc63c 100644 --- a/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator.go +++ b/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator.go @@ -5,6 +5,7 @@ package application import ( + "crypto/md5" "fmt" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" @@ -15,6 +16,7 @@ import ( const ( monitoringName = "FLEET_MONITORING" programsKey = "programs" + monitoringChecksumKey = "monitoring_checksum" monitoringKey = "agent.monitoring" monitoringUseOutputKey = "agent.monitoring.use_output" monitoringOutputFormatKey = "outputs.%s" @@ -74,12 +76,15 @@ func injectMonitoring(agentInfo *info.AgentInfo, outputGroup string, rootAst *tr } programList := make([]string, 0, len(programsToRun)) + cfgHash := md5.New() for _, p := range programsToRun { programList = append(programList, p.Spec.Cmd) + cfgHash.Write(p.Config.Hash()) } - // making program list part of the config + // making program list and their hashes part of the config // so it will get regenerated with every change config[programsKey] = programList + config[monitoringChecksumKey] = fmt.Sprintf("%x", cfgHash.Sum(nil)) monitoringProgram.Config, err = transpiler.NewAST(config) if err != nil { diff --git a/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator_test.go b/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator_test.go index 6a3be4100be2..e23027e62fc2 100644 --- a/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/monitoring_decorator_test.go @@ -5,6 +5,7 @@ package application import ( + "fmt" "testing" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" @@ -27,6 +28,10 @@ func TestMonitoringInjection(t *testing.T) { t.Fatal(err) } + if len(programsToRun) != 1 { + t.Fatal(fmt.Errorf("programsToRun expected to have %d entries", 1)) + } + GROUPLOOP: for group, ptr := range programsToRun { programsCount := len(ptr) @@ -102,6 +107,10 @@ func TestMonitoringInjectionDefaults(t *testing.T) { t.Fatal(err) } + if len(programsToRun) != 1 { + t.Fatal(fmt.Errorf("programsToRun expected to have %d entries", 1)) + } + GROUPLOOP: for group, ptr := range programsToRun { programsCount := len(ptr) @@ -177,6 +186,10 @@ func TestMonitoringInjectionDisabled(t *testing.T) { t.Fatal(err) } + if len(programsToRun) != 2 { + t.Fatal(fmt.Errorf("programsToRun expected to have %d entries", 2)) + } + GROUPLOOP: for group, ptr := range programsToRun { programsCount := len(ptr) @@ -203,19 +216,19 @@ GROUPLOOP: } // is enabled set - settingsObj, found := cm["settings"] + agentObj, found := cm["agent"] if !found { t.Errorf("settings not found for '%s(%s)': %v", group, p.Spec.Name, cm) continue GROUPLOOP } - settingsMap, ok := settingsObj.(map[string]interface{}) + agentMap, ok := agentObj.(map[string]interface{}) if !ok { t.Errorf("settings not a map for '%s(%s)': %v", group, p.Spec.Name, cm) continue GROUPLOOP } - monitoringObj, found := settingsMap["monitoring"] + monitoringObj, found := agentMap["monitoring"] if !found { t.Errorf("agent.monitoring not found for '%s(%s)': %v", group, p.Spec.Name, cm) continue GROUPLOOP @@ -247,6 +260,97 @@ GROUPLOOP: } } +func TestChangeInMonitoringWithChangeInInput(t *testing.T) { + agentInfo, err := info.NewAgentInfo() + if err != nil { + t.Fatal(err) + } + + astBefore, err := transpiler.NewAST(inputChange1) + if err != nil { + t.Fatal(err) + } + + programsToRunBefore, err := program.Programs(agentInfo, astBefore) + if err != nil { + t.Fatal(err) + } + + if len(programsToRunBefore) != 1 { + t.Fatal(fmt.Errorf("programsToRun expected to have %d entries", 1)) + } + + astAfter, err := transpiler.NewAST(inputChange2) + if err != nil { + t.Fatal(err) + } + + programsToRunAfter, err := program.Programs(agentInfo, astAfter) + if err != nil { + t.Fatal(err) + } + + if len(programsToRunAfter) != 1 { + t.Fatal(fmt.Errorf("programsToRun expected to have %d entries", 1)) + } + + // inject to both + var hashConfigBefore, hashConfigAfter string +GROUPLOOPBEFORE: + for group, ptr := range programsToRunBefore { + programsCount := len(ptr) + newPtr, err := injectMonitoring(agentInfo, group, astBefore, ptr) + if err != nil { + t.Error(err) + continue GROUPLOOPBEFORE + } + + if programsCount+1 != len(newPtr) { + t.Errorf("incorrect programs to run count, expected: %d, got %d", programsCount+1, len(newPtr)) + continue GROUPLOOPBEFORE + } + + for _, p := range newPtr { + if p.Spec.Name != monitoringName { + continue + } + + hashConfigBefore = p.Config.HashStr() + } + } + +GROUPLOOPAFTER: + for group, ptr := range programsToRunAfter { + programsCount := len(ptr) + newPtr, err := injectMonitoring(agentInfo, group, astAfter, ptr) + if err != nil { + t.Error(err) + continue GROUPLOOPAFTER + } + + if programsCount+1 != len(newPtr) { + t.Errorf("incorrect programs to run count, expected: %d, got %d", programsCount+1, len(newPtr)) + continue GROUPLOOPAFTER + } + + for _, p := range newPtr { + if p.Spec.Name != monitoringName { + continue + } + + hashConfigAfter = p.Config.HashStr() + } + } + + if hashConfigAfter == "" || hashConfigBefore == "" { + t.Fatal("hash configs uninitialized") + } + + if hashConfigAfter == hashConfigBefore { + t.Fatal("hash config equal, expected to be different") + } +} + var inputConfigMap = map[string]interface{}{ "agent.monitoring": map[string]interface{}{ "enabled": true, @@ -279,40 +383,33 @@ var inputConfigMap = map[string]interface{}{ "username": "monitoring-uname", }, }, - "datasources": []map[string]interface{}{ - map[string]interface{}{ - "inputs": []map[string]interface{}{ + "inputs": []map[string]interface{}{ + { + "type": "log", + "use_output": "infosec1", + "streams": []map[string]interface{}{ + {"paths": "/xxxx"}, + }, + "processors": []interface{}{ map[string]interface{}{ - "type": "log", - "streams": []map[string]interface{}{ - map[string]interface{}{"paths": "/xxxx"}, - }, - "processors": []interface{}{ - map[string]interface{}{ - "dissect": map[string]interface{}{ - "tokenizer": "---", - }, - }, + "dissect": map[string]interface{}{ + "tokenizer": "---", }, }, }, }, - map[string]interface{}{ - "inputs": []map[string]interface{}{ - map[string]interface{}{ - "type": "system/metrics", - "streams": []map[string]interface{}{ - map[string]interface{}{ - "id": "system/metrics-system.core", - "enabled": true, - "dataset": "system.core", - "period": "10s", - "metrics": []string{"percentages"}, - }, - }, + { + "type": "system/metrics", + "use_output": "infosec1", + "streams": []map[string]interface{}{ + { + "id": "system/metrics-system.core", + "enabled": true, + "dataset": "system.core", + "period": "10s", + "metrics": []string{"percentages"}, }, }, - "use_output": "infosec1", }, }, } @@ -343,40 +440,34 @@ var inputConfigMapDefaults = map[string]interface{}{ "username": "monitoring-uname", }, }, - "datasources": []map[string]interface{}{ - map[string]interface{}{ - "inputs": []map[string]interface{}{ + + "inputs": []map[string]interface{}{ + { + "type": "log", + "use_output": "infosec1", + "streams": []map[string]interface{}{ + {"paths": "/xxxx"}, + }, + "processors": []interface{}{ map[string]interface{}{ - "type": "log", - "streams": []map[string]interface{}{ - map[string]interface{}{"paths": "/xxxx"}, - }, - "processors": []interface{}{ - map[string]interface{}{ - "dissect": map[string]interface{}{ - "tokenizer": "---", - }, - }, + "dissect": map[string]interface{}{ + "tokenizer": "---", }, }, }, }, - map[string]interface{}{ - "inputs": []map[string]interface{}{ - map[string]interface{}{ - "type": "system/metrics", - "streams": []map[string]interface{}{ - map[string]interface{}{ - "id": "system/metrics-system.core", - "enabled": true, - "dataset": "system.core", - "period": "10s", - "metrics": []string{"percentages"}, - }, - }, + { + "type": "system/metrics", + "use_output": "infosec1", + "streams": []map[string]interface{}{ + { + "id": "system/metrics-system.core", + "enabled": true, + "dataset": "system.core", + "period": "10s", + "metrics": []string{"percentages"}, }, }, - "use_output": "infosec1", }, }, } @@ -410,40 +501,114 @@ var inputConfigMapDisabled = map[string]interface{}{ "username": "monitoring-uname", }, }, - "datasources": []map[string]interface{}{ - map[string]interface{}{ - "inputs": []map[string]interface{}{ + + "inputs": []map[string]interface{}{ + { + "type": "log", + "streams": []map[string]interface{}{ + {"paths": "/xxxx"}, + }, + "processors": []interface{}{ map[string]interface{}{ - "type": "log", - "streams": []map[string]interface{}{ - map[string]interface{}{"paths": "/xxxx"}, + "dissect": map[string]interface{}{ + "tokenizer": "---", }, - "processors": []interface{}{ - map[string]interface{}{ - "dissect": map[string]interface{}{ - "tokenizer": "---", - }, - }, + }, + }, + }, + { + "type": "system/metrics", + "use_output": "infosec1", + "streams": []map[string]interface{}{ + { + "id": "system/metrics-system.core", + "enabled": true, + "dataset": "system.core", + "period": "10s", + "metrics": []string{"percentages"}, + }, + }, + }, + }, +} + +var inputChange1 = map[string]interface{}{ + "agent.monitoring": map[string]interface{}{ + "enabled": true, + "logs": true, + "metrics": true, + "use_output": "monitoring", + }, + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "index_name": "general", + "pass": "xxx", + "type": "elasticsearch", + "url": "xxxxx", + "username": "xxx", + }, + "monitoring": map[string]interface{}{ + "type": "elasticsearch", + "index_name": "general", + "pass": "xxx", + "url": "xxxxx", + "username": "monitoring-uname", + }, + }, + "inputs": []map[string]interface{}{ + { + "type": "log", + "streams": []map[string]interface{}{ + {"paths": "/xxxx"}, + }, + "processors": []interface{}{ + map[string]interface{}{ + "dissect": map[string]interface{}{ + "tokenizer": "---", }, }, }, }, - map[string]interface{}{ - "inputs": []map[string]interface{}{ + }, +} + +var inputChange2 = map[string]interface{}{ + "agent.monitoring": map[string]interface{}{ + "enabled": true, + "logs": true, + "metrics": true, + "use_output": "monitoring", + }, + "outputs": map[string]interface{}{ + "default": map[string]interface{}{ + "index_name": "general", + "pass": "xxx", + "type": "elasticsearch", + "url": "xxxxx", + "username": "xxx", + }, + "monitoring": map[string]interface{}{ + "type": "elasticsearch", + "index_name": "general", + "pass": "xxx", + "url": "xxxxx", + "username": "monitoring-uname", + }, + }, + "inputs": []map[string]interface{}{ + { + "type": "log", + "streams": []map[string]interface{}{ + {"paths": "/xxxx"}, + {"paths": "/yyyy"}, + }, + "processors": []interface{}{ map[string]interface{}{ - "type": "system/metrics", - "streams": []map[string]interface{}{ - map[string]interface{}{ - "id": "system/metrics-system.core", - "enabled": true, - "dataset": "system.core", - "period": "10s", - "metrics": []string{"percentages"}, - }, + "dissect": map[string]interface{}{ + "tokenizer": "---", }, }, }, - "use_output": "infosec1", }, }, } diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index c6a5492133fc..882f9efdf14b 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -129,8 +129,9 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [ watchLogs := o.monitor.WatchLogs() watchMetrics := o.monitor.WatchMetrics() - // generate only on change - if watchLogs != o.isMonitoringLogs() { + // generate only when monitoring is running (for config refresh) or + // state changes (turning on/off) + if watchLogs != o.isMonitoringLogs() || watchLogs { fbConfig, any := o.getMonitoringFilebeatConfig(output) stepID := configrequest.StepRun if !watchLogs || !any { @@ -151,7 +152,7 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}) [ steps = append(steps, filebeatStep) } - if watchMetrics != o.isMonitoringMetrics() { + if watchMetrics != o.isMonitoringMetrics() || watchMetrics { mbConfig, any := o.getMonitoringMetricbeatConfig(output) stepID := configrequest.StepRun if !watchMetrics || !any { @@ -552,7 +553,19 @@ func (o *Operator) getMetricbeatEndpoints() map[string][]string { for _, a := range o.apps { metricEndpoint := a.Monitor().MetricsPathPrefixed(a.Spec(), o.pipelineID) if metricEndpoint != "" { - endpoints[strings.ReplaceAll(a.Name(), "-", "_")] = append(endpoints[a.Name()], metricEndpoint) + safeName := strings.ReplaceAll(a.Name(), "-", "_") + // prevent duplicates + var found bool + for _, ep := range endpoints[safeName] { + if ep == metricEndpoint { + found = true + break + } + } + + if !found { + endpoints[safeName] = append(endpoints[safeName], metricEndpoint) + } } } diff --git a/x-pack/elastic-agent/pkg/agent/stateresolver/stateresolver.go b/x-pack/elastic-agent/pkg/agent/stateresolver/stateresolver.go index 61ba348f760b..34bb76e551f4 100644 --- a/x-pack/elastic-agent/pkg/agent/stateresolver/stateresolver.go +++ b/x-pack/elastic-agent/pkg/agent/stateresolver/stateresolver.go @@ -47,6 +47,10 @@ func (s *StateResolver) Resolve( s.l.Infof("New State ID is %s", newStateID) s.l.Infof("Converging state requires execution of %d step(s)", len(steps)) + for i, step := range steps { + // more detailed debug log + s.l.Debugf("step %d: %s", i, step.String()) + } // Allow the operator to ack the should state when applying the steps is done correctly. ack := func() {