diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 94231c9e7c59..73e076c51761 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -33,6 +33,7 @@ - Fix default configuration after enroll {pull}18232[18232] - Fix make sure the collected logs or metrics include streams information. {pull}18261[18261] - Stop monitoring on config change {pull}18284[18284] +- Enable more granular control of monitoring {pull}18346[18346] - Fix jq: command not found {pull}18408[18408] - Avoid Chown on windows {pull}18512[18512] - Remove fleet admin from setup script {pull}18611[18611] diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index 5c1fcb6d79c4..de852f7b13ad 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -16,12 +16,14 @@ const ( monitoringName = "FLEET_MONITORING" outputKey = "output" monitoringEnabledSubkey = "enabled" + logsProcessName = "filebeat" + metricsProcessName = "metricbeat" ) func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { // if monitoring is disabled and running stop it if !o.monitor.IsMonitoringEnabled() { - if o.isMonitoring { + if o.isMonitoring != 0 { o.logger.Info("operator.handleStartSidecar: monitoring is running and disabled, proceeding to stop") return o.handleStopSidecar(s) } @@ -30,9 +32,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { return nil } - o.isMonitoring = true - - for _, step := range o.getMonitoringSteps(s, o.monitor.WatchLogs(), o.monitor.WatchMetrics()) { + for _, step := range o.getMonitoringSteps(s) { p, cfg, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags()) if err != nil { return errors.New(err, @@ -45,10 +45,14 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { if step.ID == configrequest.StepRemove { if err := o.stop(p); err != nil { result = multierror.Append(err, err) + } else { + o.markStopMonitoring(step.Process) } } else { if err := o.start(p, cfg); err != nil { result = multierror.Append(err, err) + } else { + o.markStartMonitoring(step.Process) } } } @@ -57,7 +61,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { } func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) { - for _, step := range o.getMonitoringSteps(s, true, true) { + for _, step := range o.getMonitoringSteps(s) { p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags()) if err != nil { return errors.New(err, @@ -69,15 +73,11 @@ func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) { o.logger.Debugf("stopping program %v", p) if err := o.stop(p); err != nil { result = multierror.Append(err, err) + } else { + o.markStopMonitoring(step.Process) } } - // if result != nil then something might be still running, setting isMonitoring to false - // will prevent tearing it down in a future - if result == nil { - o.isMonitoring = false - } - return result } @@ -87,7 +87,7 @@ func monitoringTags() map[app.Tag]string { } } -func (o *Operator) getMonitoringSteps(step configrequest.Step, watchLogs, watchMetrics bool) []configrequest.Step { +func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.Step { // get output config, err := getConfigFromStep(step) if err != nil { @@ -113,22 +113,25 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step, watchLogs, watchM return nil } - return o.generateMonitoringSteps(step.Version, output, watchLogs, watchMetrics) + return o.generateMonitoringSteps(step.Version, output) } -func (o *Operator) generateMonitoringSteps(version string, output interface{}, watchLogs, watchMetrics bool) []configrequest.Step { +func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step { var steps []configrequest.Step + watchLogs := o.monitor.WatchLogs() + watchMetrics := o.monitor.WatchMetrics() - if watchLogs { + // generate only on change + if watchLogs != o.isMonitoringLogs() { fbConfig, any := o.getMonitoringFilebeatConfig(output) stepID := configrequest.StepRun - if !any { + if !watchLogs || !any { stepID = configrequest.StepRemove } filebeatStep := configrequest.Step{ ID: stepID, Version: version, - Process: "filebeat", + Process: logsProcessName, Meta: map[string]interface{}{ configrequest.MetaConfigKey: fbConfig, }, @@ -136,18 +139,17 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}, w steps = append(steps, filebeatStep) } - - if watchMetrics { + if watchMetrics != o.isMonitoringMetrics() { mbConfig, any := o.getMonitoringMetricbeatConfig(output) stepID := configrequest.StepRun - if !any { + if !watchMetrics || !any { stepID = configrequest.StepRemove } metricbeatStep := configrequest.Step{ ID: stepID, Version: version, - Process: "metricbeat", + Process: metricsProcessName, Meta: map[string]interface{}{ configrequest.MetaConfigKey: mbConfig, }, @@ -273,3 +275,29 @@ func (o *Operator) getMetricbeatEndpoints() []string { return endpoints } + +func (o *Operator) markStopMonitoring(process string) { + switch process { + case logsProcessName: + o.isMonitoring ^= isMonitoringLogsFlag + case metricsProcessName: + o.isMonitoring ^= isMonitoringMetricsFlag + } +} + +func (o *Operator) markStartMonitoring(process string) { + switch process { + case logsProcessName: + o.isMonitoring |= isMonitoringLogsFlag + case metricsProcessName: + o.isMonitoring |= isMonitoringMetricsFlag + } +} + +func (o *Operator) isMonitoringLogs() bool { + return (o.isMonitoring & isMonitoringLogsFlag) != 0 +} + +func (o *Operator) isMonitoringMetrics() bool { + return (o.isMonitoring & isMonitoringMetricsFlag) != 0 +} diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index e313a1a07b95..826e5bbd03d6 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -43,7 +43,7 @@ func TestGenerateSteps(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { m := &testMonitor{monitorLogs: tc.Config.MonitorLogs, monitorMetrics: tc.Config.MonitorMetrics} operator, _ := getMonitorableTestOperator(t, "tests/scripts", m) - steps := operator.generateMonitoringSteps("8.0", sampleOutput, tc.Config.MonitorLogs, tc.Config.MonitorMetrics) + steps := operator.generateMonitoringSteps("8.0", sampleOutput) if actualSteps := len(steps); actualSteps != tc.ExpectedSteps { t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index e8ce81f2c406..22a6eb1a388e 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -26,6 +26,11 @@ import ( rconfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig/grpc" ) +const ( + isMonitoringMetricsFlag = 1 << 0 + isMonitoringLogsFlag = 1 << 1 +) + // Operator runs Start/Stop/Update operations // it is responsible for detecting reconnect to existing processes // based on backed up configuration @@ -40,7 +45,7 @@ type Operator struct { stateResolver *stateresolver.StateResolver eventProcessor callbackHooks monitor monitoring.Monitor - isMonitoring bool + isMonitoring int apps map[string]Application appsLock sync.Mutex