diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index c4bcc67e780..cf93fe5a690 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -46,6 +46,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only. Your magefile.go will require a change to adapt the devtool API. See the pull request for more details. {pull}18148[18148] - The Elasticsearch client settings expect the API key to be raw (not base64-encoded). {issue}18939[18939] {pull}18945[18945] +- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114] +- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114] ==== Bugfixes diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index fb94a26762a..7de7b5d541f 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -114,7 +114,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) { haveEnabledInputs = true } - if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.ConfigManager.Enabled() { + if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.Manager.Enabled() { if !b.InSetupCmd { return nil, errors.New("no modules or inputs enabled and configuration reloading disabled. What files do you want me to watch?") } diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 1ad682fc496..817fea40b6d 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -86,7 +86,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { return err } - if b.ConfigManager.Enabled() { + if b.Manager.Enabled() { bt.RunCentralMgmtMonitors(b) } diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index 75585ba8992..f5da2db9d0e 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -66,7 +66,7 @@ type Beat struct { Fields []byte // Data from fields.yml - ConfigManager management.ConfigManager // config manager + Manager management.Manager // manager Keystore keystore.Keystore } diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index d962207463a..f320b81c42c 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -330,12 +330,12 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { // Report central management state mgmt := monitoring.GetNamespace("state").GetRegistry().NewRegistry("management") - monitoring.NewBool(mgmt, "enabled").Set(b.ConfigManager.Enabled()) + monitoring.NewBool(mgmt, "enabled").Set(b.Manager.Enabled()) debugf("Initializing output plugins") outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled() if !outputEnabled { - if b.ConfigManager.Enabled() { + if b.Manager.Enabled() { logp.Info("Output is configured through Central Management") } else { msg := "No outputs are defined. Please define one under the output section." @@ -462,8 +462,8 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error { logp.Info("%s start running.", b.Info.Beat) // Launch config manager - b.ConfigManager.Start(beater.Stop) - defer b.ConfigManager.Stop() + b.Manager.Start(beater.Stop) + defer b.Manager.Stop() return beater.Run(&b.Beat) } @@ -643,12 +643,12 @@ func (b *Beat) configure(settings Settings) error { logp.Info("Beat ID: %v", b.Info.ID) // initialize config manager - b.ConfigManager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID) + b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID) if err != nil { return err } - if err := b.ConfigManager.CheckRawConfig(b.RawConfig); err != nil { + if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil { return err } diff --git a/libbeat/management/management.go b/libbeat/management/management.go index 690c3dba7f7..52e03fb6943 100644 --- a/libbeat/management/management.go +++ b/libbeat/management/management.go @@ -18,11 +18,36 @@ package management import ( + "sync" + "github.com/gofrs/uuid" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/logp" +) + +// Status describes the current status of the beat. +type Status int + +const ( + // Unknown is initial status when none has been reported. + Unknown Status = iota + // Starting is status describing application is starting. + Starting + // Configuring is status describing application is configuring. + Configuring + // Running is status describing application is running. + Running + // Degraded is status describing application is degraded. + Degraded + // Failed is status describing application is failed. This status should + // only be used in the case the beat should stop running as the failure + // cannot be recovered. + Failed + // Stopping is status describing application is stopping. + Stopping ) // Namespace is the feature namespace for queue definition. @@ -33,19 +58,28 @@ var DebugK = "centralmgmt" var centralMgmtKey = "x-pack-cm" -// ConfigManager interacts with the beat to update configurations -// from an external source -type ConfigManager interface { - // Enabled returns true if config manager is enabled +// StatusReporter provides a method to update current status of the beat. +type StatusReporter interface { + // UpdateStatus called when the status of the beat has changed. + UpdateStatus(status Status, msg string) +} + +// Manager interacts with the beat to provide status updates and to receive +// configurations. +type Manager interface { + StatusReporter + + // Enabled returns true if manager is enabled. Enabled() bool - // Start the config manager - Start(func()) + // Start the config manager giving it a stopFunc callback + // so the beat can be told when to stop. + Start(stopFunc func()) - // Stop the config manager + // Stop the config manager. Stop() - // CheckRawConfig check settings are correct before launching the beat + // CheckRawConfig check settings are correct before launching the beat. CheckRawConfig(cfg *common.Config) error } @@ -53,7 +87,7 @@ type ConfigManager interface { type PluginFunc func(*common.Config) FactoryFunc // FactoryFunc for creating a config manager -type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error) +type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) // Register a config manager func Register(name string, fn PluginFunc, stability feature.Stability) { @@ -91,13 +125,32 @@ func defaultModeConfig() *modeConfig { } // nilManager, fallback when no manager is present -type nilManager struct{} +type nilManager struct { + logger *logp.Logger + lock sync.Mutex + status Status + msg string +} -func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error) { - return nilManager{}, nil +func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) { + log := logp.NewLogger("mgmt") + return &nilManager{ + logger: log, + status: Unknown, + msg: "", + }, nil } -func (nilManager) Enabled() bool { return false } -func (nilManager) Start(_ func()) {} -func (nilManager) Stop() {} -func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil } +func (*nilManager) Enabled() bool { return false } +func (*nilManager) Start(_ func()) {} +func (*nilManager) Stop() {} +func (*nilManager) CheckRawConfig(cfg *common.Config) error { return nil } +func (n *nilManager) UpdateStatus(status Status, msg string) { + n.lock.Lock() + defer n.lock.Unlock() + if n.status != status || n.msg != msg { + n.status = status + n.msg = msg + n.logger.Infof("Status change to %s: %s", status, msg) + } +} diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index fbf9d23110f..bdd45ac3693 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -134,7 +134,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe return nil, errors.Wrap(err, "error reading configuration file") } - dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.ConfigManager.Enabled() + dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.Manager.Enabled() if !dynamicCfgEnabled && len(config.Modules) == 0 { return nil, mb.ErrEmptyConfig } diff --git a/x-pack/libbeat/management/fleet/manager.go b/x-pack/libbeat/management/fleet/manager.go index 55903480f06..9b20b17bc48 100644 --- a/x-pack/libbeat/management/fleet/manager.go +++ b/x-pack/libbeat/management/fleet/manager.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "sort" + "sync" "github.com/gofrs/uuid" "github.com/pkg/errors" @@ -35,12 +36,15 @@ type Manager struct { registry *reload.Registry blacklist *xmanagement.ConfigBlacklist client *client.Client + lock sync.Mutex + status management.Status + msg string stopFunc func() } // NewFleetManager returns a X-Pack Beats Fleet Management manager. -func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { +func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) { c := defaultConfig() if config.Enabled() { if err := config.Unpack(&c); err != nil { @@ -51,7 +55,7 @@ func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID } // NewFleetManagerWithConfig returns a X-Pack Beats Fleet Management manager. -func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { +func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) { log := logp.NewLogger(management.DebugK) m := &Manager{ @@ -122,15 +126,28 @@ func (cm *Manager) CheckRawConfig(cfg *common.Config) error { return nil } +// UpdateStatus updates the manager with the current status for the beat. +func (cm *Manager) UpdateStatus(status management.Status, msg string) { + cm.lock.Lock() + defer cm.lock.Unlock() + + if cm.status != status || cm.msg != msg { + cm.status = status + cm.msg = msg + cm.client.Status(statusToProtoStatus(status), msg) + cm.logger.Infof("Status change to %s: %s", status, msg) + } +} + func (cm *Manager) OnConfig(s string) { - cm.client.Status(proto.StateObserved_CONFIGURING, "Updating configuration") + cm.UpdateStatus(management.Configuring, "Updating configuration") var configMap common.MapStr uconfig, err := common.NewConfigFrom(s) if err != nil { err = errors.Wrap(err, "config blocks unsuccessfully generated") cm.logger.Error(err) - cm.client.Status(proto.StateObserved_FAILED, err.Error()) + cm.UpdateStatus(management.Failed, err.Error()) return } @@ -138,7 +155,7 @@ func (cm *Manager) OnConfig(s string) { if err != nil { err = errors.Wrap(err, "config blocks unsuccessfully generated") cm.logger.Error(err) - cm.client.Status(proto.StateObserved_FAILED, err.Error()) + cm.UpdateStatus(management.Failed, err.Error()) return } @@ -146,14 +163,14 @@ func (cm *Manager) OnConfig(s string) { if err != nil { err = errors.Wrap(err, "could not apply the configuration") cm.logger.Error(err) - cm.client.Status(proto.StateObserved_FAILED, err.Error()) + cm.UpdateStatus(management.Failed, err.Error()) return } if errs := cm.apply(blocks); !errs.IsEmpty() { err = errors.Wrap(err, "could not apply the configuration") cm.logger.Error(err) - cm.client.Status(proto.StateObserved_FAILED, err.Error()) + cm.UpdateStatus(management.Failed, err.Error()) return } @@ -285,3 +302,25 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (api.ConfigBlocks, error) { return res, nil } + +func statusToProtoStatus(status management.Status) proto.StateObserved_Status { + switch status { + case management.Unknown: + // unknown is reported as healthy, as the status is unknown + return proto.StateObserved_HEALTHY + case management.Starting: + return proto.StateObserved_STARTING + case management.Configuring: + return proto.StateObserved_CONFIGURING + case management.Running: + return proto.StateObserved_HEALTHY + case management.Degraded: + return proto.StateObserved_DEGRADED + case management.Failed: + return proto.StateObserved_FAILED + case management.Stopping: + return proto.StateObserved_STOPPING + } + // unknown status, still reported as healthy + return proto.StateObserved_HEALTHY +} diff --git a/x-pack/libbeat/management/fleet/manager_test.go b/x-pack/libbeat/management/fleet/manager_test.go index 7af72a04291..7810886018a 100644 --- a/x-pack/libbeat/management/fleet/manager_test.go +++ b/x-pack/libbeat/management/fleet/manager_test.go @@ -7,9 +7,13 @@ package fleet import ( "testing" - "github.com/elastic/beats/v7/libbeat/common" + "github.com/stretchr/testify/assert" + + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/management" ) func TestConfigBlocks(t *testing.T) { @@ -53,6 +57,16 @@ output: } } +func TestStatusToProtoStatus(t *testing.T) { + assert.Equal(t, proto.StateObserved_HEALTHY, statusToProtoStatus(management.Unknown)) + assert.Equal(t, proto.StateObserved_STARTING, statusToProtoStatus(management.Starting)) + assert.Equal(t, proto.StateObserved_CONFIGURING, statusToProtoStatus(management.Configuring)) + assert.Equal(t, proto.StateObserved_HEALTHY, statusToProtoStatus(management.Running)) + assert.Equal(t, proto.StateObserved_DEGRADED, statusToProtoStatus(management.Degraded)) + assert.Equal(t, proto.StateObserved_FAILED, statusToProtoStatus(management.Failed)) + assert.Equal(t, proto.StateObserved_STOPPING, statusToProtoStatus(management.Stopping)) +} + type dummyReloadable struct{} func (dummyReloadable) Reload(config *reload.ConfigWithMeta) error { diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go index b6b75b373ac..e9dbf7511a4 100644 --- a/x-pack/libbeat/management/manager.go +++ b/x-pack/libbeat/management/manager.go @@ -43,7 +43,7 @@ type ConfigManager struct { } // NewConfigManager returns a X-Pack Beats Central Management manager -func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { +func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) { c := defaultConfig() if config.Enabled() { if err := config.Unpack(&c); err != nil { @@ -54,7 +54,7 @@ func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID } // NewConfigManagerWithConfig returns a X-Pack Beats Central Management manager -func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) { +func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) { var client *api.Client var cache *Cache var blacklist *ConfigBlacklist @@ -152,6 +152,11 @@ func (cm *ConfigManager) CheckRawConfig(cfg *common.Config) error { return nil } +// UpdateStatus updates the manager with the current status for the beat. +func (cm *ConfigManager) UpdateStatus(_ management.Status, _ string) { + // do nothing; no longer under development and has been deprecated +} + func (cm *ConfigManager) worker() { defer cm.wg.Done()