From 4a239a99c1fe84e78bdffe1fbef4b6b1f7fdb77b Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 7 May 2020 12:47:43 +0200 Subject: [PATCH 1/3] more granular control of monitoring --- .../pkg/agent/operation/monitoring.go | 69 +++++++++++++------ .../pkg/agent/operation/monitoring_test.go | 2 +- .../pkg/agent/operation/operator.go | 7 +- 3 files changed, 55 insertions(+), 23 deletions(-) diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index 5c1fcb6d79c4..dd3740b94652 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -16,12 +16,14 @@ const ( monitoringName = "FLEET_MONITORING" outputKey = "output" monitoringEnabledSubkey = "enabled" + logsProcessName = "filebeat" + metricsProcessName = "metricbeat" ) func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { // if monitoring is disabled and running stop it if !o.monitor.IsMonitoringEnabled() { - if o.isMonitoring { + if o.isMonitoring != 0 { o.logger.Info("operator.handleStartSidecar: monitoring is running and disabled, proceeding to stop") return o.handleStopSidecar(s) } @@ -30,9 +32,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { return nil } - o.isMonitoring = true - - for _, step := range o.getMonitoringSteps(s, o.monitor.WatchLogs(), o.monitor.WatchMetrics()) { + for _, step := range o.getMonitoringSteps(s) { p, cfg, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags()) if err != nil { return errors.New(err, @@ -45,10 +45,14 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { if step.ID == configrequest.StepRemove { if err := o.stop(p); err != nil { result = multierror.Append(err, err) + } else { + o.markStopMonitoring(step.Process) } } else { if err := o.start(p, cfg); err != nil { result = multierror.Append(err, err) + } else { + o.markStartMonitoring(step.Process) } } } @@ -57,7 +61,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) { } func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) { - for _, step := range o.getMonitoringSteps(s, true, true) { + for _, step := range o.getMonitoringSteps(s) { p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags()) if err != nil { return errors.New(err, @@ -70,12 +74,7 @@ func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) { if err := o.stop(p); err != nil { result = multierror.Append(err, err) } - } - - // if result != nil then something might be still running, setting isMonitoring to false - // will prevent tearing it down in a future - if result == nil { - o.isMonitoring = false + o.markStopMonitoring(step.Process) } return result @@ -87,7 +86,7 @@ func monitoringTags() map[app.Tag]string { } } -func (o *Operator) getMonitoringSteps(step configrequest.Step, watchLogs, watchMetrics bool) []configrequest.Step { +func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.Step { // get output config, err := getConfigFromStep(step) if err != nil { @@ -113,22 +112,25 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step, watchLogs, watchM return nil } - return o.generateMonitoringSteps(step.Version, output, watchLogs, watchMetrics) + return o.generateMonitoringSteps(step.Version, output) } -func (o *Operator) generateMonitoringSteps(version string, output interface{}, watchLogs, watchMetrics bool) []configrequest.Step { +func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step { var steps []configrequest.Step + watchLogs := o.monitor.WatchLogs() + watchMetrics := o.monitor.WatchMetrics() - if watchLogs { + // generate only on change + if watchLogs != o.isMonitoringLogs() { fbConfig, any := o.getMonitoringFilebeatConfig(output) stepID := configrequest.StepRun - if !any { + if !watchLogs || !any { stepID = configrequest.StepRemove } filebeatStep := configrequest.Step{ ID: stepID, Version: version, - Process: "filebeat", + Process: logsProcessName, Meta: map[string]interface{}{ configrequest.MetaConfigKey: fbConfig, }, @@ -136,18 +138,17 @@ func (o *Operator) generateMonitoringSteps(version string, output interface{}, w steps = append(steps, filebeatStep) } - - if watchMetrics { + if watchMetrics != o.isMonitoringMetrics() { mbConfig, any := o.getMonitoringMetricbeatConfig(output) stepID := configrequest.StepRun - if !any { + if !watchMetrics || !any { stepID = configrequest.StepRemove } metricbeatStep := configrequest.Step{ ID: stepID, Version: version, - Process: "metricbeat", + Process: metricsProcessName, Meta: map[string]interface{}{ configrequest.MetaConfigKey: mbConfig, }, @@ -273,3 +274,29 @@ func (o *Operator) getMetricbeatEndpoints() []string { return endpoints } + +func (o *Operator) markStopMonitoring(process string) { + switch process { + case logsProcessName: + o.isMonitoring ^= isMonitoringLogsFlag + case metricsProcessName: + o.isMonitoring ^= isMonitoringMetricsFlag + } +} + +func (o *Operator) markStartMonitoring(process string) { + switch process { + case logsProcessName: + o.isMonitoring |= isMonitoringLogsFlag + case metricsProcessName: + o.isMonitoring |= isMonitoringMetricsFlag + } +} + +func (o *Operator) isMonitoringLogs() bool { + return (o.isMonitoring & isMonitoringLogsFlag) != 0 +} + +func (o *Operator) isMonitoringMetrics() bool { + return (o.isMonitoring & isMonitoringMetricsFlag) != 0 +} diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index e313a1a07b95..826e5bbd03d6 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -43,7 +43,7 @@ func TestGenerateSteps(t *testing.T) { t.Run(tc.Name, func(t *testing.T) { m := &testMonitor{monitorLogs: tc.Config.MonitorLogs, monitorMetrics: tc.Config.MonitorMetrics} operator, _ := getMonitorableTestOperator(t, "tests/scripts", m) - steps := operator.generateMonitoringSteps("8.0", sampleOutput, tc.Config.MonitorLogs, tc.Config.MonitorMetrics) + steps := operator.generateMonitoringSteps("8.0", sampleOutput) if actualSteps := len(steps); actualSteps != tc.ExpectedSteps { t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps) } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index e8ce81f2c406..22a6eb1a388e 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -26,6 +26,11 @@ import ( rconfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig/grpc" ) +const ( + isMonitoringMetricsFlag = 1 << 0 + isMonitoringLogsFlag = 1 << 1 +) + // Operator runs Start/Stop/Update operations // it is responsible for detecting reconnect to existing processes // based on backed up configuration @@ -40,7 +45,7 @@ type Operator struct { stateResolver *stateresolver.StateResolver eventProcessor callbackHooks monitor monitoring.Monitor - isMonitoring bool + isMonitoring int apps map[string]Application appsLock sync.Mutex From 7c0c417f378b219f161c7299ccad6a9d0fd24e2a Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 7 May 2020 12:53:24 +0200 Subject: [PATCH 2/3] changelog --- x-pack/elastic-agent/CHANGELOG.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index 185258131415..68a39e762f1d 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -33,6 +33,7 @@ - Fix default configuration after enroll {pull}18232[18232] - Fix make sure the collected logs or metrics include streams information. {pull}18261[18261] - Stop monitoring on config change {pull}18284[18284] +- Enable more granular control of monitoring {pull}18346[18346] ==== New features From 741661b046892637550d7b0a56465b8a1178c623 Mon Sep 17 00:00:00 2001 From: Michal Pristas Date: Thu, 7 May 2020 12:54:46 +0200 Subject: [PATCH 3/3] a --- x-pack/elastic-agent/pkg/agent/operation/monitoring.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go index dd3740b94652..de852f7b13ad 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring.go @@ -73,8 +73,9 @@ func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) { o.logger.Debugf("stopping program %v", p) if err := o.stop(p); err != nil { result = multierror.Append(err, err) + } else { + o.markStopMonitoring(step.Process) } - o.markStopMonitoring(step.Process) } return result