Skip to content

Commit

Permalink
[Elastic-Agent] Enable more granular control of monitoring (elastic#1…
Browse files Browse the repository at this point in the history
…8346)

[Elastic-Agent] Enable more granular control of monitoring (elastic#18346)
  • Loading branch information
michalpristas committed May 27, 2020
1 parent fb42a02 commit cfc4529
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 23 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
- Fix an issue where the checkin_frequency, jitter, and backoff options where not configurable. {pull}17843[17843]
- Ensure that the beats uses the params prefer_v2_templates on bulk request. {pull}18318[18318]
- Stop monitoring on config change {pull}18284[18284]
- Enable more granular control of monitoring {pull}18346[18346]
- Fix jq: command not found {pull}18408[18408]
- Avoid Chown on windows {pull}18512[18512]
- Clean action store after enrolling to new configuration {pull}18656[18656]
Expand Down
70 changes: 49 additions & 21 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ const (
monitoringName = "FLEET_MONITORING"
outputKey = "output"
monitoringEnabledSubkey = "enabled"
logsProcessName = "filebeat"
metricsProcessName = "metricbeat"
)

func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
// if monitoring is disabled and running stop it
if !o.monitor.IsMonitoringEnabled() {
if o.isMonitoring {
if o.isMonitoring != 0 {
o.logger.Info("operator.handleStartSidecar: monitoring is running and disabled, proceeding to stop")
return o.handleStopSidecar(s)
}
Expand All @@ -30,9 +32,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
return nil
}

o.isMonitoring = true

for _, step := range o.getMonitoringSteps(s, o.monitor.WatchLogs(), o.monitor.WatchMetrics()) {
for _, step := range o.getMonitoringSteps(s) {
p, cfg, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags())
if err != nil {
return errors.New(err,
Expand All @@ -45,10 +45,14 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
if step.ID == configrequest.StepRemove {
if err := o.stop(p); err != nil {
result = multierror.Append(err, err)
} else {
o.markStopMonitoring(step.Process)
}
} else {
if err := o.start(p, cfg); err != nil {
result = multierror.Append(err, err)
} else {
o.markStartMonitoring(step.Process)
}
}
}
Expand All @@ -57,7 +61,7 @@ func (o *Operator) handleStartSidecar(s configrequest.Step) (result error) {
}

func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) {
for _, step := range o.getMonitoringSteps(s, true, true) {
for _, step := range o.getMonitoringSteps(s) {
p, _, err := getProgramFromStepWithTags(step, o.config.DownloadConfig, monitoringTags())
if err != nil {
return errors.New(err,
Expand All @@ -69,15 +73,11 @@ func (o *Operator) handleStopSidecar(s configrequest.Step) (result error) {
o.logger.Debugf("stopping program %v", p)
if err := o.stop(p); err != nil {
result = multierror.Append(err, err)
} else {
o.markStopMonitoring(step.Process)
}
}

// if result != nil then something might be still running, setting isMonitoring to false
// will prevent tearing it down in a future
if result == nil {
o.isMonitoring = false
}

return result
}

Expand All @@ -87,7 +87,7 @@ func monitoringTags() map[app.Tag]string {
}
}

func (o *Operator) getMonitoringSteps(step configrequest.Step, watchLogs, watchMetrics bool) []configrequest.Step {
func (o *Operator) getMonitoringSteps(step configrequest.Step) []configrequest.Step {
// get output
config, err := getConfigFromStep(step)
if err != nil {
Expand All @@ -113,41 +113,43 @@ func (o *Operator) getMonitoringSteps(step configrequest.Step, watchLogs, watchM
return nil
}

return o.generateMonitoringSteps(step.Version, output, watchLogs, watchMetrics)
return o.generateMonitoringSteps(step.Version, output)
}

func (o *Operator) generateMonitoringSteps(version string, output interface{}, watchLogs, watchMetrics bool) []configrequest.Step {
func (o *Operator) generateMonitoringSteps(version string, output interface{}) []configrequest.Step {
var steps []configrequest.Step
watchLogs := o.monitor.WatchLogs()
watchMetrics := o.monitor.WatchMetrics()

if watchLogs {
// generate only on change
if watchLogs != o.isMonitoringLogs() {
fbConfig, any := o.getMonitoringFilebeatConfig(output)
stepID := configrequest.StepRun
if !any {
if !watchLogs || !any {
stepID = configrequest.StepRemove
}
filebeatStep := configrequest.Step{
ID: stepID,
Version: version,
Process: "filebeat",
Process: logsProcessName,
Meta: map[string]interface{}{
configrequest.MetaConfigKey: fbConfig,
},
}

steps = append(steps, filebeatStep)
}

if watchMetrics {
if watchMetrics != o.isMonitoringMetrics() {
mbConfig, any := o.getMonitoringMetricbeatConfig(output)
stepID := configrequest.StepRun
if !any {
if !watchMetrics || !any {
stepID = configrequest.StepRemove
}

metricbeatStep := configrequest.Step{
ID: stepID,
Version: version,
Process: "metricbeat",
Process: metricsProcessName,
Meta: map[string]interface{}{
configrequest.MetaConfigKey: mbConfig,
},
Expand Down Expand Up @@ -273,3 +275,29 @@ func (o *Operator) getMetricbeatEndpoints() []string {

return endpoints
}

func (o *Operator) markStopMonitoring(process string) {
switch process {
case logsProcessName:
o.isMonitoring ^= isMonitoringLogsFlag
case metricsProcessName:
o.isMonitoring ^= isMonitoringMetricsFlag
}
}

func (o *Operator) markStartMonitoring(process string) {
switch process {
case logsProcessName:
o.isMonitoring |= isMonitoringLogsFlag
case metricsProcessName:
o.isMonitoring |= isMonitoringMetricsFlag
}
}

func (o *Operator) isMonitoringLogs() bool {
return (o.isMonitoring & isMonitoringLogsFlag) != 0
}

func (o *Operator) isMonitoringMetrics() bool {
return (o.isMonitoring & isMonitoringMetricsFlag) != 0
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestGenerateSteps(t *testing.T) {
t.Run(tc.Name, func(t *testing.T) {
m := &testMonitor{monitorLogs: tc.Config.MonitorLogs, monitorMetrics: tc.Config.MonitorMetrics}
operator, _ := getMonitorableTestOperator(t, "tests/scripts", m)
steps := operator.generateMonitoringSteps("8.0", sampleOutput, tc.Config.MonitorLogs, tc.Config.MonitorMetrics)
steps := operator.generateMonitoringSteps("8.0", sampleOutput)
if actualSteps := len(steps); actualSteps != tc.ExpectedSteps {
t.Fatalf("invalid number of steps, expected %v, got %v", tc.ExpectedSteps, actualSteps)
}
Expand Down
7 changes: 6 additions & 1 deletion x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
rconfig "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/remoteconfig/grpc"
)

const (
isMonitoringMetricsFlag = 1 << 0
isMonitoringLogsFlag = 1 << 1
)

// Operator runs Start/Stop/Update operations
// it is responsible for detecting reconnect to existing processes
// based on backed up configuration
Expand All @@ -40,7 +45,7 @@ type Operator struct {
stateResolver *stateresolver.StateResolver
eventProcessor callbackHooks
monitor monitoring.Monitor
isMonitoring bool
isMonitoring int

apps map[string]Application
appsLock sync.Mutex
Expand Down

0 comments on commit cfc4529

Please sign in to comment.