Skip to content

Commit

Permalink
Add checks to ensure reloading of units if the configuration actually…
Browse files Browse the repository at this point in the history
… changed. (elastic#34346)

* Add checks to ensure reloading of units if the configuration actually changed.

* Add changelog entry.
  • Loading branch information
blakerouse authored Jan 23, 2023
1 parent 69ebd98 commit 5b1f828
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 28 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Support for multiline zookeeper logs {issue}2496[2496]
- Allow `clock_nanosleep` in the default seccomp profiles for amd64 and 386. Newer versions of glibc (e.g. 2.31) require it. {issue}33792[33792]
- Disable lockfile when running under elastic-agent. {pull}33988[33988]
- Add checks to ensure reloading of units if the configuration actually changed. {pull}34346[34346]

*Auditbeat*


*Filebeat*
- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for
- [GCS] Added support for more mime types & introduced offset tracking via cursor state. Also added support for
automatic splitting at root level, if root level element is an array. {pull}34155[34155]
- [httpsjon] Improved error handling during pagination with chaining & split processor {pull}34127[34127]
- [Azure blob storage] Added support for more mime types & introduced offset tracking via cursor state. {pull}33981[33981]
Expand Down
85 changes: 58 additions & 27 deletions x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@ import (
"syscall"
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/gofrs/uuid"
"github.com/joeshaw/multierror"
"go.uber.org/zap/zapcore"

"github.com/gofrs/uuid"
gproto "google.golang.org/protobuf/proto"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/libbeat/common/reload"
lbmanagement "github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/elastic-agent-client/v7/pkg/client"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

// unitKey is used to identify a unique unit in a map
Expand Down Expand Up @@ -66,13 +67,17 @@ type BeatV2Manager struct {

isRunning bool

// is set on first instance of a config reload,
// allowing us to restart the beat if stopOnOutputReload is set
outputIsConfigured bool
// set with the last applied output config
// allows tracking if the configuration actually changed and if the
// beat needs to restart if stopOnOutputReload is set
lastOutputCfg *proto.UnitExpectedConfig

// set with the last applied input configs
lastInputCfgs map[string]*proto.UnitExpectedConfig

// used for the debug callback to report as-running config
lastOutputCfg *reload.ConfigWithMeta
lastInputCfg []*reload.ConfigWithMeta
lastBeatOutputCfg *reload.ConfigWithMeta
lastBeatInputCfgs []*reload.ConfigWithMeta
}

// ================================
Expand Down Expand Up @@ -496,22 +501,31 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error {
return fmt.Errorf("failed to reload output: %w", err)
}
cm.lastOutputCfg = nil
return nil
}

if cm.stopOnOutputReload && cm.outputIsConfigured {
cm.logger.Info("beat is restarting because output changed")
_ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil)
cm.Stop()
cm.lastBeatOutputCfg = nil
return nil
}

_, _, rawConfig := unit.Expected()
if rawConfig == nil {
// should not happen; hard stop
return fmt.Errorf("output unit has no config")
}

if cm.lastOutputCfg != nil && gproto.Equal(cm.lastOutputCfg, rawConfig) {
// configuration for the output did not change; do nothing
cm.logger.Debug("Skipped reloading output; configuration didn't change")
return nil
}

cm.logger.Debugf("Got output unit config '%s'", rawConfig.GetId())

if cm.stopOnOutputReload && cm.lastOutputCfg != nil {
cm.logger.Info("beat is restarting because output changed")
_ = unit.UpdateState(client.UnitStateStopping, "Restarting", nil)
cm.Stop()
return nil
}

reloadConfig, err := groupByOutputs(rawConfig)
if err != nil {
return fmt.Errorf("failed to generate config for output: %w", err)
Expand All @@ -521,9 +535,8 @@ func (cm *BeatV2Manager) reloadOutput(unit *client.Unit) error {
if err != nil {
return fmt.Errorf("failed to reload output: %w", err)
}
cm.lastOutputCfg = reloadConfig
// set to true, we'll reload the output if we need to re-configure
cm.outputIsConfigured = true
cm.lastOutputCfg = rawConfig
cm.lastBeatOutputCfg = reloadConfig
return nil
}

Expand All @@ -533,22 +546,40 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
return fmt.Errorf("failed to find beat reloadable type 'input'")
}

var inputCfgs []*reload.ConfigWithMeta
inputCfgs := make(map[string]*proto.UnitExpectedConfig, len(inputUnits))
inputBeatCfgs := make([]*reload.ConfigWithMeta, 0, len(inputUnits))
agentInfo := cm.client.AgentInfo()
for _, unit := range inputUnits {
_, _, rawConfig := unit.Expected()
if rawConfig == nil {
// should not happen; hard stop
return fmt.Errorf("input unit %s has no config", unit.ID())
}

var prevCfg *proto.UnitExpectedConfig
if cm.lastInputCfgs != nil {
prevCfg, _ = cm.lastInputCfgs[unit.ID()]
}
if prevCfg != nil && gproto.Equal(prevCfg, rawConfig) {
// configuration for the input did not change; do nothing
cm.logger.Debugf("Skipped reloading input unit %s; configuration didn't change", unit.ID())
continue
}

inputCfg, err := generateBeatConfig(rawConfig, agentInfo)
if err != nil {
return fmt.Errorf("failed to generate configuration for unit %s: %w", unit.ID(), err)
}
inputCfgs = append(inputCfgs, inputCfg...)
inputCfgs[unit.ID()] = rawConfig
inputBeatCfgs = append(inputBeatCfgs, inputCfg...)
}

err := obj.Reload(inputCfgs)
err := obj.Reload(inputBeatCfgs)
if err != nil {
return fmt.Errorf("failed to reloading inputs: %w", err)
}
cm.lastInputCfg = inputCfgs
cm.lastInputCfgs = inputCfgs
cm.lastBeatInputCfgs = inputBeatCfgs
return nil
}

Expand All @@ -557,7 +588,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error {
func (cm *BeatV2Manager) handleDebugYaml() []byte {
// generate input
inputList := []map[string]interface{}{}
for _, module := range cm.lastInputCfg {
for _, module := range cm.lastBeatInputCfgs {
var inputMap map[string]interface{}
err := module.Config.Unpack(&inputMap)
if err != nil {
Expand All @@ -569,8 +600,8 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte {

// generate output
outputCfg := map[string]interface{}{}
if cm.lastOutputCfg != nil {
err := cm.lastOutputCfg.Config.Unpack(&outputCfg)
if cm.lastBeatOutputCfg != nil {
err := cm.lastBeatOutputCfg.Config.Unpack(&outputCfg)
if err != nil {
cm.logger.Errorf("error unpacking output config for debug callback: %s", err)
return nil
Expand Down

0 comments on commit 5b1f828

Please sign in to comment.