Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename to management.Manager, add UpdateStatus to Manager interface. #19114

Merged
merged 4 commits into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
Your magefile.go will require a change to adapt the devtool API. See the pull request for
more details. {pull}18148[18148]
- The Elasticsearch client settings expect the API key to be raw (not base64-encoded). {issue}18939[18939] {pull}18945[18945]
- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114]
- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114]

==== Bugfixes

Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
haveEnabledInputs = true
}

if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.ConfigManager.Enabled() {
if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.Manager.Enabled() {
if !b.InSetupCmd {
return nil, errors.New("no modules or inputs enabled and configuration reloading disabled. What files do you want me to watch?")
}
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
return err
}

if b.ConfigManager.Enabled() {
if b.Manager.Enabled() {
bt.RunCentralMgmtMonitors(b)
}

Expand Down
2 changes: 1 addition & 1 deletion libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type Beat struct {

Fields []byte // Data from fields.yml

ConfigManager management.ConfigManager // config manager
Manager management.Manager // manager

Keystore keystore.Keystore
}
Expand Down
12 changes: 6 additions & 6 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,12 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {

// Report central management state
mgmt := monitoring.GetNamespace("state").GetRegistry().NewRegistry("management")
monitoring.NewBool(mgmt, "enabled").Set(b.ConfigManager.Enabled())
monitoring.NewBool(mgmt, "enabled").Set(b.Manager.Enabled())

debugf("Initializing output plugins")
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
if !outputEnabled {
if b.ConfigManager.Enabled() {
if b.Manager.Enabled() {
logp.Info("Output is configured through Central Management")
} else {
msg := "No outputs are defined. Please define one under the output section."
Expand Down Expand Up @@ -462,8 +462,8 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
logp.Info("%s start running.", b.Info.Beat)

// Launch config manager
b.ConfigManager.Start(beater.Stop)
defer b.ConfigManager.Stop()
b.Manager.Start(beater.Stop)
defer b.Manager.Stop()

return beater.Run(&b.Beat)
}
Expand Down Expand Up @@ -643,12 +643,12 @@ func (b *Beat) configure(settings Settings) error {
logp.Info("Beat ID: %v", b.Info.ID)

// initialize config manager
b.ConfigManager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
if err != nil {
return err
}

if err := b.ConfigManager.CheckRawConfig(b.RawConfig); err != nil {
if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
return err
}

Expand Down
80 changes: 64 additions & 16 deletions libbeat/management/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,36 @@
package management

import (
"sync"

"github.com/gofrs/uuid"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/logp"
)

// Status describes the current status of the beat.
type Status int

const (
// Unknown is initial status when none has been reported.
Unknown Status = iota
// Starting is status describing application is starting.
Starting
// Configuring is status describing application is configuring.
Configuring
// Running is status describing application is running.
Running
// Degraded is status describing application is degraded.
Degraded
// Failed is status describing application is failed. This status should
// only be used in the case the beat should stop running as the failure
// cannot be recovered.
Failed
// Stopping is status describing application is stopping.
Stopping
)

// Namespace is the feature namespace for queue definition.
Expand All @@ -33,27 +58,31 @@ var DebugK = "centralmgmt"

var centralMgmtKey = "x-pack-cm"

// ConfigManager interacts with the beat to update configurations
// from an external source
type ConfigManager interface {
// Enabled returns true if config manager is enabled
// Manager interacts with the beat to provide status updates and to receive
// configurations.
type Manager interface {
// Enabled returns true if manager is enabled.
Enabled() bool

// Start the config manager
Start(func())
// Start the config manager giving it a stopFunc callback
// so the beat can be told when to stop.
Start(stopFunc func())

// Stop the config manager
// Stop the config manager.
Stop()

// CheckRawConfig check settings are correct before launching the beat
// CheckRawConfig check settings are correct before launching the beat.
CheckRawConfig(cfg *common.Config) error

// UpdateStatus called when the status of the beat has changed.
UpdateStatus(status Status, msg string)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about moving the UpdateStatus method into it's own interface? I'd like to push only 'status' reporting to the inputs/outputs and provide a set of helper functions that wrap a 'StatusReporter', merging status changes from multiple subsystems. But in the end we can also introduce the minimal interface in another package when we need it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can move it to its own interface, would it also still be in the management.Manager interface?

type Manager interface {
    StatusReporter

    ...
}

}

// PluginFunc for creating FactoryFunc if it matches a config
type PluginFunc func(*common.Config) FactoryFunc

// FactoryFunc for creating a config manager
type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error)
type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (Manager, error)

// Register a config manager
func Register(name string, fn PluginFunc, stability feature.Stability) {
Expand Down Expand Up @@ -91,13 +120,32 @@ func defaultModeConfig() *modeConfig {
}

// nilManager, fallback when no manager is present
type nilManager struct{}
type nilManager struct {
logger *logp.Logger
lock sync.Mutex
status Status
msg string
}

func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error) {
return nilManager{}, nil
func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
log := logp.NewLogger("mgmt")
return &nilManager{
logger: log,
status: Unknown,
msg: "",
}, nil
}

func (nilManager) Enabled() bool { return false }
func (nilManager) Start(_ func()) {}
func (nilManager) Stop() {}
func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
func (*nilManager) Enabled() bool { return false }
func (*nilManager) Start(_ func()) {}
func (*nilManager) Stop() {}
func (*nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
func (n *nilManager) UpdateStatus(status Status, msg string) {
n.lock.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this here if this is nil manager - noop manager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do it only for logging the status changes. I think it is good to show status updates in the logger as well, so it's clear what is happening in the beat, even if its not controlled by Elastic Agent.

defer n.lock.Unlock()
if n.status != status || n.msg != msg {
n.status = status
n.msg = msg
n.logger.Infof("Status change to %s: %s", status, msg)
}
}
2 changes: 1 addition & 1 deletion metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
return nil, errors.Wrap(err, "error reading configuration file")
}

dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.ConfigManager.Enabled()
dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.Manager.Enabled()
if !dynamicCfgEnabled && len(config.Modules) == 0 {
return nil, mb.ErrEmptyConfig
}
Expand Down
54 changes: 47 additions & 7 deletions x-pack/libbeat/management/fleet/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"os"
"sort"
"sync"

"github.com/gofrs/uuid"
"github.com/pkg/errors"
Expand All @@ -35,12 +36,15 @@ type Manager struct {
registry *reload.Registry
blacklist *xmanagement.ConfigBlacklist
client *client.Client
lock sync.Mutex
status management.Status
msg string

stopFunc func()
}

// NewFleetManager returns a X-Pack Beats Fleet Management manager.
func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
c := defaultConfig()
if config.Enabled() {
if err := config.Unpack(&c); err != nil {
Expand All @@ -51,7 +55,7 @@ func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID
}

// NewFleetManagerWithConfig returns a X-Pack Beats Fleet Management manager.
func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
log := logp.NewLogger(management.DebugK)

m := &Manager{
Expand Down Expand Up @@ -122,38 +126,52 @@ func (cm *Manager) CheckRawConfig(cfg *common.Config) error {
return nil
}

// UpdateStatus updates the manager with the current status for the beat.
func (cm *Manager) UpdateStatus(status management.Status, msg string) {
cm.lock.Lock()
if cm.status != status || cm.msg != msg {
cm.status = status
cm.msg = msg
cm.lock.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should test this with concurrent request, what i fear may occur is that wit many concurrent changes of status
we will end up with different internal state vs reported state
do you see some pros/cons of doing reporting using a channel or having full flow lock?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also try to 'forget' outdated status updates if we get new ones while we are waiting. This could be implemented using sync.Cond.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My only reason to track the state at this level is so it can be logged when it is changed. We could remove the internal tracking and locking in the Fleet Manager if we just log every time UpdateStatus is called. Then the locking inside of client will handle updating the correct state to the manager.

Or we could place the call to client inside of the lock to ensure that its always the same between the 2.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think client options is also ok. i think logging inconsistent states will cause us more trouble then help us during some sdh triaging

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with moving the logging and client into the lock.

cm.client.Status(statusToProtoStatus(status), msg)
Copy link
Contributor

@michalpristas michalpristas Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we update status on the client which will result in some action either during checkin or in watcher after some timeout.
if somebody reports failure i think (correct me if my memory failed) we talked about re-starting application immediately, can you make sure this flow is valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This flow is valid from the side of the Elastic Agent, as reporting Failed will cause the application to be killed and restarted. This is fully tested in covered in the Elastic Agent unit tests, both in the case that the application returns Failed over the protocol and if the application just crashes.

cm.logger.Infof("Status change to %s: %s", status, msg)
return
}
cm.lock.Unlock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why unlock the call to client.Status? If we have a race between unlock and the Status being send by two go-routines, how can you tell which one is the newer status update?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was commenting about on that as well, I can move the client.Status call inside of the lock. I think that would be the best.

}

func (cm *Manager) OnConfig(s string) {
cm.client.Status(proto.StateObserved_CONFIGURING, "Updating configuration")
cm.UpdateStatus(management.Configuring, "Updating configuration")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we switch back after config is applied or do we let beat to do that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does switch back to Running at the end of OnConfig.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you;re right it was hidden so i overlooked

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I read this right OnConfig sets the status to 'Healthy' at the end. Are we sure this is the correct state to report? Do we 'forget' the internal state here?

How about removing the 'Configuring' status? The less states (and protocol behavior) we have, the less we can get it wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally do not like the idea of setting the status in the OnConfig handler, I feel like that should really only be done specifically by the beat itself. So if OnConfig passes it to the reloader then the reloader should really set the Status.

At the moment I keep it this way so it works, and once status is being reported correctly by the beat, this should be removed (probably at the same time).

I do not think we should removing Configuring I think it is a key state for watching the observability of the Beat and propogating that information to Fleet.

It is good to know that going Healthy -> Configuring -> Failed is because everything was good, then updating the configuration caused it to fail.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally do not like the idea of setting the status in the OnConfig handler, I feel like that should really only be done specifically by the beat itself. So if OnConfig passes it to the reloader then the reloader should really set the Status.

At the moment I keep it this way so it works, and once status is being reported correctly by the beat, this should be removed (probably at the same time).

+1
Is this required change/cleanup documented in an issue?


var configMap common.MapStr
uconfig, err := common.NewConfigFrom(s)
if err != nil {
err = errors.Wrap(err, "config blocks unsuccessfully generated")
cm.logger.Error(err)
cm.client.Status(proto.StateObserved_FAILED, err.Error())
cm.UpdateStatus(management.Failed, err.Error())
return
}

err = uconfig.Unpack(&configMap)
if err != nil {
err = errors.Wrap(err, "config blocks unsuccessfully generated")
cm.logger.Error(err)
cm.client.Status(proto.StateObserved_FAILED, err.Error())
cm.UpdateStatus(management.Failed, err.Error())
return
}

blocks, err := cm.toConfigBlocks(configMap)
if err != nil {
err = errors.Wrap(err, "could not apply the configuration")
cm.logger.Error(err)
cm.client.Status(proto.StateObserved_FAILED, err.Error())
cm.UpdateStatus(management.Failed, err.Error())
Copy link

@urso urso Jun 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given all these errors I wonder what 'Failed' means. Is the beat really 'failed' or are we (the overall application with agent) running in a degraded mode because not all configurations could have been applied? Even if we fail here the Beat continues to publish events for existing inputs, right?

Are we mixing the status of the 'operation' with the beat health here? If the operation fails, are we required to restart the Beat?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in the list of statuses there is Failed and Degraded.

        // Degraded is status describing application is degraded.
	Degraded
	// Failed is status describing application is failed. This status should
	// only be used in the case the beat should stop running as the failure
	// cannot be recovered.
	Failed

In Degarded state beat can keep sending events and agent will not stop the process. Failed is failed, I have completely failed, restart me.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I've seen these status types. Is this method run on startup only, or also if the beat is already active? In the later case the Beat will continue with the old state if there was an error decoding the configuration. Do we want to enforce a 'restart' if the config update operation failed? Will Failed trigger a restart by the Agent?

Are we mixing the status of the update 'operation' with the beat health here? If the operation fails, are we required to restart the Beat?

I'm more or less wondering about the overal semantics of OnConfig and the single errors we can get.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will run both on startup and while running.

The beat doesn't need to worry about handling the restart on Failed status, the Agent will do that work for it, restart it and bring it back up.

return
}

if errs := cm.apply(blocks); !errs.IsEmpty() {
err = errors.Wrap(err, "could not apply the configuration")
cm.logger.Error(err)
cm.client.Status(proto.StateObserved_FAILED, err.Error())
cm.UpdateStatus(management.Failed, err.Error())
return
}

Expand Down Expand Up @@ -285,3 +303,25 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (api.ConfigBlocks, error) {

return res, nil
}

func statusToProtoStatus(status management.Status) proto.StateObserved_Status {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add unit test to test switching statuses back and forth with some noop client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment I don't believe client from elastic-agent-client is exposed as an interface. To allow me to completely replace it with a noop client.

switch status {
case management.Unknown:
// unknown is reported as healthy, as the status is unknown
return proto.StateObserved_HEALTHY
case management.Starting:
return proto.StateObserved_STARTING
case management.Configuring:
return proto.StateObserved_CONFIGURING
case management.Running:
return proto.StateObserved_HEALTHY
case management.Degraded:
return proto.StateObserved_DEGRADED
case management.Failed:
return proto.StateObserved_FAILED
case management.Stopping:
return proto.StateObserved_STOPPING
}
// unknown status, still reported as healthy
return proto.StateObserved_HEALTHY
}
9 changes: 7 additions & 2 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type ConfigManager struct {
}

// NewConfigManager returns a X-Pack Beats Central Management manager
func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
c := defaultConfig()
if config.Enabled() {
if err := config.Unpack(&c); err != nil {
Expand All @@ -54,7 +54,7 @@ func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID
}

// NewConfigManagerWithConfig returns a X-Pack Beats Central Management manager
func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
var client *api.Client
var cache *Cache
var blacklist *ConfigBlacklist
Expand Down Expand Up @@ -152,6 +152,11 @@ func (cm *ConfigManager) CheckRawConfig(cfg *common.Config) error {
return nil
}

// UpdateStatus updates the manager with the current status for the beat.
func (cm *ConfigManager) UpdateStatus(_ management.Status, _ string) {
// do nothing; no longer under development and has been deprecated
}

func (cm *ConfigManager) worker() {
defer cm.wg.Done()

Expand Down