From e47f7e30e44d2991f12065ea8986c2ca085c2854 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 23 Jan 2023 11:30:47 -0500 Subject: [PATCH] Add checks to ensure reloading of units if the configuration actually changed. (#34346) * Add checks to ensure reloading of units if the configuration actually changed. * Add changelog entry. (cherry picked from commit 5b1f82813200097ea00e6ff22af5c0d2d2ced314) --- CHANGELOG.next.asciidoc | 7 ++- x-pack/libbeat/management/managerV2.go | 85 ++++++++++++++++++-------- 2 files changed, 64 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2de6669566f..9e3437b068d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,12 +39,17 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix namespacing on self-monitoring {pull}32336[32336] - Fix race condition when stopping runners {pull}32433[32433] - Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491] +- Fix Windows service install/uninstall when Win32_Service returns error, add logic to wait until the Windows Service is stopped before proceeding. {pull}33322[33322] +- Support for multiline zookeeper logs {issue}2496[2496] +- Allow `clock_nanosleep` in the default seccomp profiles for amd64 and 386. Newer versions of glibc (e.g. 2.31) require it. {issue}33792[33792] +- Disable lockfile when running under elastic-agent. {pull}33988[33988] +- Add checks to ensure reloading of units if the configuration actually changed. {pull}34346[34346] *Auditbeat* *Filebeat* -- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for +- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for automatic splitting at root level, if root level element is an array. {pull}34155[34155] - [httpsjon] Improved error handling during pagination with chaining & split processor {pull}34127[34127] - [Azure blob storage] Added support for more mime types & introduced offset tracking via cursor state. {pull}33981[33981] diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 70c196e347a..fbca2c28032 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -13,18 +13,19 @@ import ( "syscall" "time" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/gofrs/uuid" "github.com/joeshaw/multierror" "go.uber.org/zap/zapcore" - - "github.com/gofrs/uuid" + gproto "google.golang.org/protobuf/proto" "gopkg.in/yaml.v2" "github.com/elastic/beats/v7/libbeat/common/reload" lbmanagement "github.com/elastic/beats/v7/libbeat/management" "github.com/elastic/beats/v7/libbeat/version" - "github.com/elastic/elastic-agent-client/v7/pkg/client" - conf "github.com/elastic/elastic-agent-libs/config" - "github.com/elastic/elastic-agent-libs/logp" ) // unitKey is used to identify a unique unit in a map @@ -66,13 +67,17 @@ type BeatV2Manager struct { isRunning bool - // is set on first instance of a config reload, - // allowing us to restart the beat if stopOnOutputReload is set - outputIsConfigured bool + // set with the last applied output config + // allows tracking if the configuration actually changed and if the + // beat needs to restart if stopOnOutputReload is set + lastOutputCfg *proto.UnitExpectedConfig + + // set with the last applied input configs + lastInputCfgs map[string]*proto.UnitExpectedConfig // used for the debug callback to report as-running config - lastOutputCfg *reload.ConfigWithMeta - lastInputCfg []*reload.ConfigWithMeta + lastBeatOutputCfg *reload.ConfigWithMeta + lastBeatInputCfgs []*reload.ConfigWithMeta } // ================================ @@ -496,22 +501,31 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error { return fmt.Errorf("failed to reload output: %w", err) } cm.lastOutputCfg = nil - return nil - } - - if cm.stopOnOutputReload && cm.outputIsConfigured { - cm.logger.Info("beat is restarting because output changed") - _ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil) - cm.Stop() + cm.lastBeatOutputCfg = nil return nil } _, _, rawConfig := unit.Expected() if rawConfig == nil { + // should not happen; hard stop return fmt.Errorf("output unit has no config") } + + if cm.lastOutputCfg != nil && gproto.Equal(cm.lastOutputCfg, rawConfig) { + // configuration for the output did not change; do nothing + cm.logger.Debug("Skipped reloading output; configuration didn't change") + return nil + } + cm.logger.Debugf("Got output unit config '%s'", rawConfig.GetId()) + if cm.stopOnOutputReload && cm.lastOutputCfg != nil { + cm.logger.Info("beat is restarting because output changed") + _ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil) + cm.Stop() + return nil + } + reloadConfig, err := groupByOutputs(rawConfig) if err != nil { return fmt.Errorf("failed to generate config for output: %w", err) @@ -521,9 +535,8 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error { if err != nil { return fmt.Errorf("failed to reload output: %w", err) } - cm.lastOutputCfg = reloadConfig - // set to true, we'll reload the output if we need to re-configure - cm.outputIsConfigured = true + cm.lastOutputCfg = rawConfig + cm.lastBeatOutputCfg = reloadConfig return nil } @@ -533,22 +546,40 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { return fmt.Errorf("failed to find beat reloadable type 'input'") } - var inputCfgs []*reload.ConfigWithMeta + inputCfgs := make(map[string]*proto.UnitExpectedConfig, len(inputUnits)) + inputBeatCfgs := make([]*reload.ConfigWithMeta, 0, len(inputUnits)) agentInfo := cm.client.AgentInfo() for _, unit := range inputUnits { _, _, rawConfig := unit.Expected() + if rawConfig == nil { + // should not happen; hard stop + return fmt.Errorf("input unit %s has no config", unit.ID()) + } + + var prevCfg *proto.UnitExpectedConfig + if cm.lastInputCfgs != nil { + prevCfg, _ = cm.lastInputCfgs[unit.ID()] + } + if prevCfg != nil && gproto.Equal(prevCfg, rawConfig) { + // configuration for the input did not change; do nothing + cm.logger.Debugf("Skipped reloading input unit %s; configuration didn't change", unit.ID()) + continue + } + inputCfg, err := generateBeatConfig(rawConfig, agentInfo) if err != nil { return fmt.Errorf("failed to generate configuration for unit %s: %w", unit.ID(), err) } - inputCfgs = append(inputCfgs, inputCfg...) + inputCfgs[unit.ID()] = rawConfig + inputBeatCfgs = append(inputBeatCfgs, inputCfg...) } - err := obj.Reload(inputCfgs) + err := obj.Reload(inputBeatCfgs) if err != nil { return fmt.Errorf("failed to reloading inputs: %w", err) } - cm.lastInputCfg = inputCfgs + cm.lastInputCfgs = inputCfgs + cm.lastBeatInputCfgs = inputBeatCfgs return nil } @@ -557,7 +588,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { func (cm *BeatV2Manager) handleDebugYaml() []byte { // generate input inputList := []map[string]interface{}{} - for _, module := range cm.lastInputCfg { + for _, module := range cm.lastBeatInputCfgs { var inputMap map[string]interface{} err := module.Config.Unpack(&inputMap) if err != nil { @@ -569,8 +600,8 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte { // generate output outputCfg := map[string]interface{}{} - if cm.lastOutputCfg != nil { - err := cm.lastOutputCfg.Config.Unpack(&outputCfg) + if cm.lastBeatOutputCfg != nil { + err := cm.lastBeatOutputCfg.Config.Unpack(&outputCfg) if err != nil { cm.logger.Errorf("error unpacking output config for debug callback: %s", err) return nil