From a655d2e53416cc43e930bad02b46f32f8e2b2700 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Mon, 15 Jun 2020 13:08:13 -0400 Subject: [PATCH] [Agent] Improve the handling of failed applications, fixed crash in FleetManager, report all state changes to Fleet (#19178) * Improve the handling of restarting applications on failure and crashes. * Fix reporting in the fleet manager. Fix reporting in Agent to push all state changes of the application to fleet. * Fix imports. --- .../pkg/agent/application/managed_mode.go | 11 +- .../pkg/agent/application/stream.go | 29 +--- .../pkg/agent/operation/event_processor.go | 26 --- .../pkg/agent/operation/monitoring_test.go | 5 +- .../pkg/agent/operation/operation.go | 1 + .../pkg/agent/operation/operation_config.go | 12 +- .../pkg/agent/operation/operation_fetch.go | 13 +- .../pkg/agent/operation/operation_install.go | 13 +- .../pkg/agent/operation/operation_remove.go | 13 +- .../pkg/agent/operation/operation_start.go | 19 +-- .../pkg/agent/operation/operation_stop.go | 22 +-- .../pkg/agent/operation/operation_verify.go | 12 +- .../pkg/agent/operation/operator.go | 69 ++++---- .../elastic-agent/pkg/core/plugin/app/app.go | 131 ++++++++------- .../pkg/core/plugin/app/start.go | 35 ++-- .../pkg/core/plugin/app/status.go | 31 ++-- .../pkg/core/plugin/state/state.go | 21 +-- .../elastic-agent/pkg/core/server/server.go | 12 +- x-pack/elastic-agent/pkg/reporter/reporter.go | 111 +++++++------ .../pkg/reporter/reporter_test.go | 150 +++++++++++------- x-pack/libbeat/management/fleet/manager.go | 7 +- 21 files changed, 339 insertions(+), 404 deletions(-) delete mode 100644 x-pack/elastic-agent/pkg/agent/operation/event_processor.go diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 3991045518f6..1f215869a3a9 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -116,7 +116,13 @@ func newManaged( managedApplication.bgContext, managedApplication.cancelCtxFn = context.WithCancel(ctx) managedApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{}) if err != nil { - return nil, errors.New(err, "initialize GRPC listener") + return nil, errors.New(err, "initialize GRPC listener", errors.TypeNetwork) + } + // must start before `Start` is called as Fleet will already try to start applications + // before `Start` is even called. + err = managedApplication.srv.Start() + if err != nil { + return nil, errors.New(err, "starting GRPC listener", errors.TypeNetwork) } logR := logreporter.NewReporter(log, cfg.Reporting.Log) @@ -208,9 +214,6 @@ func newManaged( // Start starts a managed elastic-agent. func (m *Managed) Start() error { m.log.Info("Agent is starting") - if err := m.srv.Start(); err != nil { - return err - } m.gateway.Start() return nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/stream.go b/x-pack/elastic-agent/pkg/agent/application/stream.go index 0b3a73768c34..c8ebd40a795f 100644 --- a/x-pack/elastic-agent/pkg/agent/application/stream.go +++ b/x-pack/elastic-agent/pkg/agent/application/stream.go @@ -6,9 +6,6 @@ package application import ( "context" - "io" - "net/http" - "net/url" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation" @@ -19,30 +16,10 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" ) -// EventProcessor is an processor of application event -type reporter interface { - OnStarting(ctx context.Context, app string) - OnRunning(ctx context.Context, app string) - OnFailing(ctx context.Context, app string, err error) - OnStopping(ctx context.Context, app string) - OnStopped(ctx context.Context, app string) - OnFatal(ctx context.Context, app string, err error) -} - -type sender interface { - Send( - ctx context.Context, - method string, - path string, - params url.Values, - headers http.Header, - body io.Reader, - ) (*http.Response, error) -} - type operatorStream struct { configHandler ConfigHandler log *logger.Logger @@ -57,7 +34,7 @@ func (b *operatorStream) Execute(cfg *configRequest) error { return b.configHandler.HandleConfig(cfg) } -func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) { +func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) { return func(log *logger.Logger, id routingKey) (stream, error) { // new operator per stream to isolate processes without using tags operator, err := newOperator(ctx, log, id, cfg, srv, r, m) @@ -72,7 +49,7 @@ func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, } } -func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) (*operation.Operator, error) { +func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) { operatorConfig := operatorCfg.DefaultConfig() if err := config.Unpack(&operatorConfig); err != nil { return nil, err diff --git a/x-pack/elastic-agent/pkg/agent/operation/event_processor.go b/x-pack/elastic-agent/pkg/agent/operation/event_processor.go deleted file mode 100644 index ecbb98960cbc..000000000000 --- a/x-pack/elastic-agent/pkg/agent/operation/event_processor.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package operation - -import "context" - -// EventProcessor is an processor of application event -type callbackHooks interface { - OnStarting(ctx context.Context, app string) - OnRunning(ctx context.Context, app string) - OnFailing(ctx context.Context, app string, err error) - OnStopping(ctx context.Context, app string) - OnStopped(ctx context.Context, app string) - OnFatal(ctx context.Context, app string, err error) -} - -type noopCallbackHooks struct{} - -func (*noopCallbackHooks) OnStarting(ctx context.Context, app string) {} -func (*noopCallbackHooks) OnRunning(ctx context.Context, app string) {} -func (*noopCallbackHooks) OnFailing(ctx context.Context, app string, err error) {} -func (*noopCallbackHooks) OnStopping(ctx context.Context, app string) {} -func (*noopCallbackHooks) OnStopped(ctx context.Context, app string) {} -func (*noopCallbackHooks) OnFatal(ctx context.Context, app string, err error) {} diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index 12f9ca37d4e0..886d06faea63 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -153,8 +153,9 @@ func (*testMonitorableApp) Stop() {} func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error { return nil } -func (*testMonitorableApp) State() state.State { return state.State{} } -func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor } +func (*testMonitorableApp) State() state.State { return state.State{} } +func (*testMonitorableApp) SetState(_ state.Status, _ string) {} +func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor } type testMonitor struct { monitorLogs bool diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation.go b/x-pack/elastic-agent/pkg/agent/operation/operation.go index cfc11cceae70..e498990c4bc3 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation.go @@ -36,6 +36,7 @@ type Application interface { Configure(ctx context.Context, config map[string]interface{}) error Monitor() monitoring.Monitor State() state.State + SetState(status state.Status, msg string) } // Descriptor defines a program which needs to be run. diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go index 8fe6af3056e6..c5658c0d0417 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_config.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_config.go @@ -10,6 +10,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" ) var ( @@ -25,19 +26,16 @@ type operationConfig struct { logger *logger.Logger operatorConfig *config.Config cfg map[string]interface{} - eventProcessor callbackHooks } func newOperationConfig( logger *logger.Logger, operatorConfig *config.Config, - cfg map[string]interface{}, - eventProcessor callbackHooks) *operationConfig { + cfg map[string]interface{}) *operationConfig { return &operationConfig{ logger: logger, operatorConfig: operatorConfig, cfg: cfg, - eventProcessor: eventProcessor, } } @@ -55,11 +53,7 @@ func (o *operationConfig) Check(_ Application) (bool, error) { return true, nil func (o *operationConfig) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - err = errors.New(err, - o.Name(), - errors.TypeApplication, - errors.M(errors.MetaKeyAppName, application.Name())) - o.eventProcessor.OnFailing(ctx, application.Name(), err) + application.SetState(state.Failed, err.Error()) } }() return application.Configure(ctx, o.cfg) diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go b/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go index bda22a861e00..6f0bb20b0610 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go @@ -8,11 +8,11 @@ import ( "context" "os" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" ) // operationFetch fetches artifact from preconfigured source @@ -22,22 +22,19 @@ type operationFetch struct { program Descriptor operatorConfig *config.Config downloader download.Downloader - eventProcessor callbackHooks } func newOperationFetch( logger *logger.Logger, program Descriptor, operatorConfig *config.Config, - downloader download.Downloader, - eventProcessor callbackHooks) *operationFetch { + downloader download.Downloader) *operationFetch { return &operationFetch{ logger: logger, program: program, operatorConfig: operatorConfig, downloader: downloader, - eventProcessor: eventProcessor, } } @@ -69,11 +66,7 @@ func (o *operationFetch) Check(_ Application) (bool, error) { func (o *operationFetch) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - err = errors.New(err, - o.Name(), - errors.TypeApplication, - errors.M(errors.MetaKeyAppName, application.Name())) - o.eventProcessor.OnFailing(ctx, application.Name(), err) + application.SetState(state.Failed, err.Error()) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_install.go b/x-pack/elastic-agent/pkg/agent/operation/operation_install.go index 0e045cb15dfb..95a1fbb2ff30 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_install.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_install.go @@ -8,10 +8,10 @@ import ( "context" "os" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" ) // operationInstall installs a artifact from predefined location @@ -21,22 +21,19 @@ type operationInstall struct { program Descriptor operatorConfig *config.Config installer install.Installer - eventProcessor callbackHooks } func newOperationInstall( logger *logger.Logger, program Descriptor, operatorConfig *config.Config, - installer install.Installer, - eventProcessor callbackHooks) *operationInstall { + installer install.Installer) *operationInstall { return &operationInstall{ logger: logger, program: program, operatorConfig: operatorConfig, installer: installer, - eventProcessor: eventProcessor, } } @@ -58,11 +55,7 @@ func (o *operationInstall) Check(_ Application) (bool, error) { func (o *operationInstall) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - err = errors.New(err, - o.Name(), - errors.TypeApplication, - errors.M(errors.MetaKeyAppName, application.Name())) - o.eventProcessor.OnFailing(ctx, application.Name(), err) + application.SetState(state.Failed, err.Error()) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go b/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go index 587d546bb8fd..17bb1d5a85f0 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_remove.go @@ -7,16 +7,15 @@ package operation import ( "context" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" ) // operationRemove uninstall and removes all the bits related to the artifact type operationRemove struct { - eventProcessor callbackHooks } -func newOperationRemove(eventProcessor callbackHooks) *operationRemove { - return &operationRemove{eventProcessor: eventProcessor} +func newOperationRemove() *operationRemove { + return &operationRemove{} } // Name is human readable name identifying an operation @@ -35,11 +34,7 @@ func (o *operationRemove) Check(_ Application) (bool, error) { func (o *operationRemove) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - o.eventProcessor.OnFailing(ctx, application.Name(), err) - err = errors.New(err, - o.Name(), - errors.TypeApplication, - errors.M(errors.MetaKeyAppName, application.Name())) + application.SetState(state.Failed, err.Error()) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_start.go b/x-pack/elastic-agent/pkg/agent/operation/operation_start.go index d6dc4f3d3892..23aa92f87d95 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_start.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_start.go @@ -7,12 +7,10 @@ package operation import ( "context" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" - - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" ) // operationStart start installed process @@ -22,7 +20,6 @@ type operationStart struct { program Descriptor operatorConfig *config.Config cfg map[string]interface{} - eventProcessor callbackHooks pi *process.Info } @@ -31,8 +28,7 @@ func newOperationStart( logger *logger.Logger, program Descriptor, operatorConfig *config.Config, - cfg map[string]interface{}, - eventProcessor callbackHooks) *operationStart { + cfg map[string]interface{}) *operationStart { // TODO: make configurable return &operationStart{ @@ -40,7 +36,6 @@ func newOperationStart( program: program, operatorConfig: operatorConfig, cfg: cfg, - eventProcessor: eventProcessor, } } @@ -63,17 +58,9 @@ func (o *operationStart) Check(application Application) (bool, error) { // Run runs the operation func (o *operationStart) Run(ctx context.Context, application Application) (err error) { - o.eventProcessor.OnStarting(ctx, application.Name()) defer func() { if err != nil { - // kill the process if something failed - err = errors.New(err, - o.Name(), - errors.TypeApplication, - errors.M(errors.MetaKeyAppName, application.Name())) - o.eventProcessor.OnFailing(ctx, application.Name(), err) - } else { - o.eventProcessor.OnRunning(ctx, application.Name()) + application.SetState(state.Failed, err.Error()) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_stop.go b/x-pack/elastic-agent/pkg/agent/operation/operation_stop.go index 222e933b877d..c7433b649fbb 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_stop.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_stop.go @@ -7,11 +7,9 @@ package operation import ( "context" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" - - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" ) // operationStop stops the running process @@ -19,17 +17,14 @@ import ( type operationStop struct { logger *logger.Logger operatorConfig *config.Config - eventProcessor callbackHooks } func newOperationStop( logger *logger.Logger, - operatorConfig *config.Config, - eventProcessor callbackHooks) *operationStop { + operatorConfig *config.Config) *operationStop { return &operationStop{ logger: logger, operatorConfig: operatorConfig, - eventProcessor: eventProcessor, } } @@ -50,19 +45,6 @@ func (o *operationStop) Check(application Application) (bool, error) { // Run runs the operation func (o *operationStop) Run(ctx context.Context, application Application) (err error) { - o.eventProcessor.OnStopping(ctx, application.Name()) - defer func() { - if err != nil { - err = errors.New(err, - o.Name(), - errors.TypeApplication, - errors.M(errors.MetaKeyAppName, application.Name())) - o.eventProcessor.OnFailing(ctx, application.Name(), err) - } else { - o.eventProcessor.OnStopped(ctx, application.Name()) - } - }() - application.Stop() return nil } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go b/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go index 63f987b748a8..32447880434f 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation_verify.go @@ -13,12 +13,12 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" ) // operationVerify verifies downloaded artifact for correct signature // skips if artifact is already installed type operationVerify struct { - eventProcessor callbackHooks program Descriptor operatorConfig *config.Config verifier download.Verifier @@ -27,12 +27,10 @@ type operationVerify struct { func newOperationVerify( program Descriptor, operatorConfig *config.Config, - verifier download.Verifier, - eventProcessor callbackHooks) *operationVerify { + verifier download.Verifier) *operationVerify { return &operationVerify{ program: program, operatorConfig: operatorConfig, - eventProcessor: eventProcessor, verifier: verifier, } } @@ -64,11 +62,7 @@ func (o *operationVerify) Check(_ Application) (bool, error) { func (o *operationVerify) Run(ctx context.Context, application Application) (err error) { defer func() { if err != nil { - err = errors.New(err, - o.Name(), - errors.TypeApplication, - errors.M(errors.MetaKeyAppName, application.Name())) - o.eventProcessor.OnFailing(ctx, application.Name(), err) + application.SetState(state.Failed, err.Error()) } }() diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index 7c7b376cc177..3c74f7b81049 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -11,8 +11,6 @@ import ( "strings" "sync" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" - "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" operatorCfg "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config" @@ -24,6 +22,7 @@ import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" ) @@ -38,16 +37,16 @@ const ( // Enables running sidecars for processes. // TODO: implement retry strategies type Operator struct { - bgContext context.Context - pipelineID string - logger *logger.Logger - config *operatorCfg.Config - handlers map[string]handleFunc - stateResolver *stateresolver.StateResolver - srv *server.Server - eventProcessor callbackHooks - monitor monitoring.Monitor - isMonitoring int + bgContext context.Context + pipelineID string + logger *logger.Logger + config *operatorCfg.Config + handlers map[string]handleFunc + stateResolver *stateresolver.StateResolver + srv *server.Server + reporter state.Reporter + monitor monitoring.Monitor + isMonitoring int apps map[string]Application appsLock sync.Mutex @@ -70,7 +69,7 @@ func NewOperator( installer install.Installer, stateResolver *stateresolver.StateResolver, srv *server.Server, - eventProcessor callbackHooks, + reporter state.Reporter, monitor monitoring.Monitor) (*Operator, error) { operatorConfig := operatorCfg.DefaultConfig() @@ -82,23 +81,19 @@ func NewOperator( return nil, fmt.Errorf("artifacts configuration not provided") } - if eventProcessor == nil { - eventProcessor = &noopCallbackHooks{} - } - operator := &Operator{ - bgContext: ctx, - config: operatorConfig, - pipelineID: pipelineID, - logger: logger, - downloader: fetcher, - verifier: verifier, - installer: installer, - stateResolver: stateResolver, - srv: srv, - apps: make(map[string]Application), - eventProcessor: eventProcessor, - monitor: monitor, + bgContext: ctx, + config: operatorConfig, + pipelineID: pipelineID, + logger: logger, + downloader: fetcher, + verifier: verifier, + installer: installer, + stateResolver: stateResolver, + srv: srv, + apps: make(map[string]Application), + reporter: reporter, + monitor: monitor, } operator.initHandlerMap() @@ -164,12 +159,12 @@ func (o *Operator) start(p Descriptor, cfg map[string]interface{}) (err error) { newRetryableOperations( o.logger, o.config.RetryConfig, - newOperationFetch(o.logger, p, o.config, o.downloader, o.eventProcessor), - newOperationVerify(p, o.config, o.verifier, o.eventProcessor), + newOperationFetch(o.logger, p, o.config, o.downloader), + newOperationVerify(p, o.config, o.verifier), ), - newOperationInstall(o.logger, p, o.config, o.installer, o.eventProcessor), - newOperationStart(o.logger, p, o.config, cfg, o.eventProcessor), - newOperationConfig(o.logger, o.config, cfg, o.eventProcessor), + newOperationInstall(o.logger, p, o.config, o.installer), + newOperationStart(o.logger, p, o.config, cfg), + newOperationConfig(o.logger, o.config, cfg), } return o.runFlow(p, flow) } @@ -177,7 +172,7 @@ func (o *Operator) start(p Descriptor, cfg map[string]interface{}) (err error) { // Stop stops the running process, if process is already stopped it does not return an error func (o *Operator) stop(p Descriptor) (err error) { flow := []operation{ - newOperationStop(o.logger, o.config, o.eventProcessor), + newOperationStop(o.logger, o.config), } return o.runFlow(p, flow) @@ -186,7 +181,7 @@ func (o *Operator) stop(p Descriptor) (err error) { // PushConfig tries to push config to a running process func (o *Operator) pushConfig(p Descriptor, cfg map[string]interface{}) error { flow := []operation{ - newOperationConfig(o.logger, o.config, cfg, o.eventProcessor), + newOperationConfig(o.logger, o.config, cfg), } return o.runFlow(p, flow) @@ -259,7 +254,7 @@ func (o *Operator) getApp(p Descriptor) (Application, error) { o.srv, o.config, o.logger, - o.eventProcessor.OnFailing, + o.reporter, o.monitor) if err != nil { diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/app.go b/x-pack/elastic-agent/pkg/core/plugin/app/app.go index d85f28b49abc..e7356d0b5421 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/app.go @@ -34,24 +34,21 @@ var ( ErrClientNotConfigurable = errors.New("client does not provide configuration", errors.TypeApplication) ) -// ReportFailureFunc is a callback func used to report async failures due to crashes. -type ReportFailureFunc func(context.Context, string, error) - // Application encapsulates a concrete application ran by elastic-agent e.g Beat. type Application struct { - bgContext context.Context - id string - name string - pipelineID string - logLevel string - spec Specifier - srv *server.Server - srvState *server.ApplicationState - limiter *tokenbucket.Bucket - failureReporter ReportFailureFunc - startContext context.Context - tag Taggable - state state.State + bgContext context.Context + id string + name string + pipelineID string + logLevel string + spec Specifier + srv *server.Server + srvState *server.ApplicationState + limiter *tokenbucket.Bucket + startContext context.Context + tag Taggable + state state.State + reporter state.Reporter uid int gid int @@ -79,7 +76,7 @@ func NewApplication( srv *server.Server, cfg *config.Config, logger *logger.Logger, - failureReporter ReportFailureFunc, + reporter state.Reporter, monitor monitoring.Monitor) (*Application, error) { s := spec.Spec() @@ -90,22 +87,22 @@ func NewApplication( b, _ := tokenbucket.NewTokenBucket(ctx, 3, 3, 1*time.Second) return &Application{ - bgContext: ctx, - id: id, - name: appName, - pipelineID: pipelineID, - logLevel: logLevel, - spec: spec, - srv: srv, - processConfig: cfg.ProcessConfig, - downloadConfig: cfg.DownloadConfig, - retryConfig: cfg.RetryConfig, - logger: logger, - limiter: b, - failureReporter: failureReporter, - monitor: monitor, - uid: uid, - gid: gid, + bgContext: ctx, + id: id, + name: appName, + pipelineID: pipelineID, + logLevel: logLevel, + spec: spec, + srv: srv, + processConfig: cfg.ProcessConfig, + downloadConfig: cfg.DownloadConfig, + retryConfig: cfg.RetryConfig, + logger: logger, + limiter: b, + reporter: reporter, + monitor: monitor, + uid: uid, + gid: gid, }, nil } @@ -153,8 +150,14 @@ func (a *Application) Stop() { // cleanup drops a.monitor.Cleanup(a.name, a.pipelineID) } - a.state.Status = state.Stopped - a.state.Message = "Stopped" + a.setState(state.Stopped, "Stopped") +} + +// SetState sets the status of the application. +func (a *Application) SetState(status state.Status, msg string) { + a.appLock.Lock() + defer a.appLock.Unlock() + a.setState(status, msg) } func (a *Application) watch(ctx context.Context, p Taggable, proc *process.Info, cfg map[string]interface{}) { @@ -170,6 +173,11 @@ func (a *Application) watch(ctx context.Context, p Taggable, proc *process.Info, } a.appLock.Lock() + if a.state.ProcessInfo != proc { + // already another process started, another watcher is watching instead + a.appLock.Unlock() + return + } a.state.ProcessInfo = nil srvState := a.srvState @@ -178,15 +186,13 @@ func (a *Application) watch(ctx context.Context, p Taggable, proc *process.Info, return } - msg := fmt.Sprintf("Exited with code: %d", procState.ExitCode()) - a.state.Status = state.Crashed - a.state.Message = msg - a.appLock.Unlock() + msg := fmt.Sprintf("exited with code: %d", procState.ExitCode()) + a.setState(state.Crashed, msg) - // it was a crash, report it async not to block - // process management with networking issues - go a.reportCrash(ctx) - a.Start(ctx, p, cfg) + // it was a crash, cleanup anything required + go a.cleanUp() + a.start(ctx, p, cfg) + a.appLock.Unlock() }() } @@ -206,16 +212,35 @@ func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState { return resChan } -func (a *Application) reportCrash(ctx context.Context) { - a.monitor.Cleanup(a.name, a.pipelineID) +func (a *Application) setStateFromProto(pstatus proto.StateObserved_Status, msg string) { + var status state.Status + switch pstatus { + case proto.StateObserved_STARTING: + status = state.Starting + case proto.StateObserved_CONFIGURING: + status = state.Configuring + case proto.StateObserved_HEALTHY: + status = state.Running + case proto.StateObserved_DEGRADED: + status = state.Degraded + case proto.StateObserved_FAILED: + status = state.Failed + case proto.StateObserved_STOPPING: + status = state.Stopping + } + a.setState(status, msg) +} - // TODO: reporting crash - if a.failureReporter != nil { - crashError := errors.New( - fmt.Sprintf("application '%s' crashed", a.id), - errors.TypeApplicationCrash, - errors.M(errors.MetaKeyAppName, a.name), - errors.M(errors.MetaKeyAppName, a.id)) - a.failureReporter(ctx, a.name, crashError) +func (a *Application) setState(status state.Status, msg string) { + if a.state.Status != status || a.state.Message != msg { + a.state.Status = status + a.state.Message = msg + if a.reporter != nil { + go a.reporter.OnStateChange(a.id, a.name, a.state) + } } } + +func (a *Application) cleanUp() { + a.monitor.Cleanup(a.name, a.pipelineID) +} diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/start.go b/x-pack/elastic-agent/pkg/core/plugin/app/start.go index aafaff2c4cc1..57c20edae75c 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/start.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/start.go @@ -23,7 +23,15 @@ import ( ) // Start starts the application with a specified config. -func (a *Application) Start(ctx context.Context, t Taggable, cfg map[string]interface{}) (err error) { +func (a *Application) Start(ctx context.Context, t Taggable, cfg map[string]interface{}) error { + a.appLock.Lock() + defer a.appLock.Unlock() + + return a.start(ctx, t, cfg) +} + +// Start starts the application without grabbing the lock. +func (a *Application) start(ctx context.Context, t Taggable, cfg map[string]interface{}) (err error) { defer func() { if err != nil { // inject App metadata @@ -31,44 +39,37 @@ func (a *Application) Start(ctx context.Context, t Taggable, cfg map[string]inte } }() + // already started if not stopped or crashed + if a.state.Status != state.Stopped && a.state.Status != state.Crashed && a.state.Status != state.Failed { + return nil + } + cfgStr, err := yaml.Marshal(cfg) if err != nil { return err } - // because `Start` can be called by `ApplicationStatusHandler` to perform a restart on failure - // the locking needs to be handled in the correct order. - a.appLock.Lock() a.startContext = ctx a.tag = t srvState := a.srvState - a.appLock.Unlock() // Failed applications can be started again. if srvState != nil { - srvState.SetStatus(proto.StateObserved_STARTING, "Starting") + a.setState(state.Starting, "Starting") + srvState.SetStatus(proto.StateObserved_STARTING, a.state.Message) srvState.UpdateConfig(string(cfgStr)) } else { - a.appLock.Lock() a.srvState, err = a.srv.Register(a, string(cfgStr)) if err != nil { return err } - a.appLock.Unlock() } - // now that `SetStatus` would call `ApplicationStatusHandler` has occurred the - // reset of `Start` can be held by the lock. - a.appLock.Lock() - defer a.appLock.Unlock() - if a.state.Status != state.Stopped { // restarting as it was previously in a different state - a.state.Status = state.Restarting - a.state.Message = "Restarting" + a.setState(state.Restarting, "Restarting") } else { - a.state.Status = state.Starting - a.state.Message = "Starting" + a.setState(state.Starting, "Starting") } defer func() { diff --git a/x-pack/elastic-agent/pkg/core/plugin/app/status.go b/x-pack/elastic-agent/pkg/core/plugin/app/status.go index a3039af0c734..a88cc73684f2 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/app/status.go +++ b/x-pack/elastic-agent/pkg/core/plugin/app/status.go @@ -5,7 +5,6 @@ package app import ( - "context" "fmt" "gopkg.in/yaml.v2" @@ -13,7 +12,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" - pstate "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server" ) @@ -27,8 +26,8 @@ type ApplicationStatusHandler struct{} // OnStatusChange is the handler called by the GRPC server code. // // It updates the status of the application and handles restarting the application is needed. -func (*ApplicationStatusHandler) OnStatusChange(state *server.ApplicationState, status proto.StateObserved_Status, msg string) { - app, ok := state.App().(*Application) +func (*ApplicationStatusHandler) OnStatusChange(s *server.ApplicationState, status proto.StateObserved_Status, msg string) { + app, ok := s.App().(*Application) if !ok { panic(errors.New("only *Application can be registered when using the ApplicationStatusHandler", errors.TypeUnexpected)) } @@ -37,23 +36,21 @@ func (*ApplicationStatusHandler) OnStatusChange(state *server.ApplicationState, // If the application is stopped, do not update the state. Stopped is a final state // and should not be overridden. - if app.state.Status == pstate.Stopped { + if app.state.Status == state.Stopped { app.appLock.Unlock() return } - app.state.UpdateFromProto(status) - app.state.Message = msg + app.setStateFromProto(status, msg) if status == proto.StateObserved_FAILED { // ignore when expected state is stopping - if state.Expected() == proto.StateExpected_STOPPING { + if s.Expected() == proto.StateExpected_STOPPING { app.appLock.Unlock() return } - // it was a crash, report it async not to block - // process management with networking issues - go app.reportCrash(context.Background()) + // it was a crash, cleanup anything required + go app.cleanUp() // kill the process if app.state.ProcessInfo != nil { @@ -62,21 +59,15 @@ func (*ApplicationStatusHandler) OnStatusChange(state *server.ApplicationState, } ctx := app.startContext tag := app.tag - app.appLock.Unlock() // it was marshalled to pass into the state, so unmarshall will always succeed var cfg map[string]interface{} - _ = yaml.Unmarshal([]byte(state.Config()), &cfg) + _ = yaml.Unmarshal([]byte(s.Config()), &cfg) - err := app.Start(ctx, tag, cfg) + err := app.start(ctx, tag, cfg) if err != nil { - app.logger.Error(errors.New( - fmt.Sprintf("application '%s' failed to restart", app.id), - errors.TypeApplicationCrash, - errors.M(errors.MetaKeyAppName, app.name), - errors.M(errors.MetaKeyAppName, app.id))) + app.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err)) } - return } app.appLock.Unlock() } diff --git a/x-pack/elastic-agent/pkg/core/plugin/state/state.go b/x-pack/elastic-agent/pkg/core/plugin/state/state.go index a3a4419e6fb6..378017e4e8e4 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/state/state.go +++ b/x-pack/elastic-agent/pkg/core/plugin/state/state.go @@ -6,7 +6,6 @@ package state import ( "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" ) // Status describes the current status of the application process. @@ -40,20 +39,8 @@ type State struct { Message string } -// UpdateFromProto updates the status from the status from the GRPC protocol. -func (s *State) UpdateFromProto(status proto.StateObserved_Status) { - switch status { - case proto.StateObserved_STARTING: - s.Status = Starting - case proto.StateObserved_CONFIGURING: - s.Status = Configuring - case proto.StateObserved_HEALTHY: - s.Status = Running - case proto.StateObserved_DEGRADED: - s.Status = Degraded - case proto.StateObserved_FAILED: - s.Status = Failed - case proto.StateObserved_STOPPING: - s.Status = Stopping - } +// Reporter is interface that is called when a state is changed. +type Reporter interface { + // OnStateChange is called when state changes. + OnStateChange(id string, name string, state State) } diff --git a/x-pack/elastic-agent/pkg/core/server/server.go b/x-pack/elastic-agent/pkg/core/server/server.go index 38db57234b65..4be292abb19b 100644 --- a/x-pack/elastic-agent/pkg/core/server/server.go +++ b/x-pack/elastic-agent/pkg/core/server/server.go @@ -128,6 +128,11 @@ func New(logger *logger.Logger, listenAddr string, handler Handler) (*Server, er // Start starts the GRPC endpoint and accepts new connections. func (s *Server) Start() error { + if s.server != nil { + // already started + return nil + } + lis, err := net.Listen("tcp", s.listenAddr) if err != nil { return err @@ -652,16 +657,9 @@ func (as *ApplicationState) Status() (proto.StateObserved_Status, string) { // This status will be overwritten by the client if it reconnects and updates it status. func (as *ApplicationState) SetStatus(status proto.StateObserved_Status, msg string) { as.checkinLock.RLock() - prevStatus := as.status - prevMessage := as.statusMessage as.status = status as.statusMessage = msg as.checkinLock.RUnlock() - - // alert the service handler that status has changed for the application - if prevStatus != status || prevMessage != msg { - as.srv.handler.OnStatusChange(as, status, msg) - } } // updateStatus updates the current observed status from the application, sends the expected state back to the diff --git a/x-pack/elastic-agent/pkg/reporter/reporter.go b/x-pack/elastic-agent/pkg/reporter/reporter.go index d10cb5ece051..b27488d6a43d 100644 --- a/x-pack/elastic-agent/pkg/reporter/reporter.go +++ b/x-pack/elastic-agent/pkg/reporter/reporter.go @@ -9,6 +9,8 @@ import ( "fmt" "time" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" + "github.com/hashicorp/go-multierror" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" @@ -23,16 +25,20 @@ const ( // EventTypeActionResult is an record type describing applications result of an action EventTypeActionResult = "ACTION_RESULT" - // EventSubTypeStarting is an event type indicating application is starting + // EventSubTypeStopped is an event type indicating application is stopped. + EventSubTypeStopped = "STOPPED" + // EventSubTypeStarting is an event type indicating application is starting. EventSubTypeStarting = "STARTING" - // EventSubTypeInProgress is an event type indicating application is in progress + // EventSubTypeInProgress is an event type indicating application is in progress. EventSubTypeInProgress = "IN_PROGRESS" // EventSubTypeConfig is an event indicating application config related event. EventSubTypeConfig = "CONFIG" - // EventSubTypeStopping is an event type indicating application is stopping + // EventSubTypeRunning is an event indicating application running related event. + EventSubTypeRunning = "RUNNING" + // EventSubTypeFailed is an event type indicating application is failed. + EventSubTypeFailed = "FAILED" + // EventSubTypeStopping is an event type indicating application is stopping. EventSubTypeStopping = "STOPPING" - // EventSubTypeStopped is an event type indicating application is stopped - EventSubTypeStopped = "STOPPED" ) type agentInfo interface { @@ -42,6 +48,7 @@ type agentInfo interface { // Reporter uses multiple backends which needs to be non-blocking // to report various events. type Reporter struct { + ctx context.Context info agentInfo backends []Backend @@ -51,6 +58,7 @@ type Reporter struct { // NewReporter creates a new reporter with provided set of Backends. func NewReporter(ctx context.Context, logger *logger.Logger, info agentInfo, backends ...Backend) *Reporter { return &Reporter{ + ctx: ctx, info: info, backends: backends, l: logger, @@ -64,46 +72,10 @@ func (r *Reporter) Close() { } } -// OnStarting reports application starting event. -func (r *Reporter) OnStarting(ctx context.Context, application string) { - msg := fmt.Sprintf("Application: %s[%s]: State change: STARTING", application, r.info.AgentID()) - rec := generateRecord(EventTypeState, EventSubTypeStarting, msg) - r.report(ctx, rec) -} - -// OnRunning reports application running event. -func (r *Reporter) OnRunning(ctx context.Context, application string) { - msg := fmt.Sprintf("Application: %s[%s]: State change: IN_PROGRESS", application, r.info.AgentID()) - rec := generateRecord(EventTypeState, EventSubTypeInProgress, msg) - r.report(ctx, rec) -} - -// OnFailing reports application failed event. -func (r *Reporter) OnFailing(ctx context.Context, application string, err error) { - msg := fmt.Sprintf("Application: %s[%s]: %v", application, r.info.AgentID(), err) - rec := generateRecord(EventTypeError, EventSubTypeConfig, msg) - r.report(ctx, rec) -} - -// OnStopping reports application stopped event. -func (r *Reporter) OnStopping(ctx context.Context, application string) { - msg := fmt.Sprintf("Application: %s[%s]: State change: STOPPING", application, r.info.AgentID()) - rec := generateRecord(EventTypeState, EventSubTypeStopping, msg) - r.report(ctx, rec) -} - -// OnStopped reports application stopped event. -func (r *Reporter) OnStopped(ctx context.Context, application string) { - msg := fmt.Sprintf("Application: %s[%s]: State change: STOPPED", application, r.info.AgentID()) - rec := generateRecord(EventTypeState, EventSubTypeStopped, msg) - r.report(ctx, rec) -} - -// OnFatal reports applications fatal event. -func (r *Reporter) OnFatal(ctx context.Context, application string, err error) { - msg := fmt.Sprintf("Application: %s[%s]: %v", application, r.info.AgentID(), err) - rec := generateRecord(EventTypeError, EventSubTypeConfig, msg) - r.report(ctx, rec) +// OnStateChange called when state of an application changes. +func (r *Reporter) OnStateChange(id string, name string, state state.State) { + rec := generateRecord(r.info.AgentID(), id, name, state) + r.report(r.ctx, rec) } func (r *Reporter) report(ctx context.Context, e event) { @@ -120,11 +92,54 @@ func (r *Reporter) report(ctx context.Context, e event) { } } -func generateRecord(eventype, subType, message string) event { +func generateRecord(agentID string, id string, name string, s state.State) event { + eventType := EventTypeState + + var subType string + var subTypeText string + switch s.Status { + case state.Stopped: + subType = EventSubTypeStopped + subTypeText = EventSubTypeStopped + case state.Starting: + subType = EventSubTypeStarting + subTypeText = EventSubTypeStarting + case state.Configuring: + subType = EventSubTypeConfig + subTypeText = EventSubTypeConfig + case state.Running: + subType = EventSubTypeRunning + subTypeText = EventSubTypeRunning + case state.Degraded: + // Fleet doesn't understand degraded + subType = EventSubTypeRunning + subTypeText = "DEGRADED" + case state.Failed: + eventType = EventTypeError + subType = EventSubTypeFailed + subTypeText = EventSubTypeFailed + case state.Crashed: + eventType = EventTypeError + subType = EventSubTypeFailed + subTypeText = "CRASHED" + case state.Stopping: + subType = EventSubTypeStopping + subTypeText = EventSubTypeStopping + case state.Restarting: + subType = EventSubTypeStarting + subTypeText = "RESTARTING" + } + + err := errors.New( + fmt.Errorf(s.Message), + fmt.Sprintf("Application: %s[%s]: State changed to %s", id, agentID, subTypeText), + errors.TypeApplication, + errors.M(errors.MetaKeyAppID, id), + errors.M(errors.MetaKeyAppName, name)) return event{ - eventype: eventype, + eventype: eventType, subType: subType, timestamp: time.Now(), - message: message, + message: err.Error(), } } diff --git a/x-pack/elastic-agent/pkg/reporter/reporter_test.go b/x-pack/elastic-agent/pkg/reporter/reporter_test.go index 5f95824279cc..754e2972884f 100644 --- a/x-pack/elastic-agent/pkg/reporter/reporter_test.go +++ b/x-pack/elastic-agent/pkg/reporter/reporter_test.go @@ -6,8 +6,13 @@ package reporter import ( "context" - "errors" + "fmt" + "strings" "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state" ) var result Event @@ -24,67 +29,92 @@ type info struct{} func (*info) AgentID() string { return "id" } +type testScenario struct { + Status state.Status + StatusMessage string + EventType string + EventSubType string + EventMessage string +} + func TestTypes(t *testing.T) { rep := NewReporter(context.Background(), nil, &info{}, &testReporter{}) - // test starting - rep.OnStarting(context.Background(), "a1") - if r := result.Type(); r != EventTypeState { - t.Errorf("OnStarting: expected record type '%v', got '%v'", EventTypeState, r) + scenarios := []testScenario{ + { + Status: state.Stopped, + StatusMessage: "Stopped", + EventType: EventTypeState, + EventSubType: EventSubTypeStopped, + EventMessage: "Application: a-stopped[id]: State changed to STOPPED: Stopped", + }, + { + Status: state.Starting, + StatusMessage: "Starting", + EventType: EventTypeState, + EventSubType: EventSubTypeStarting, + EventMessage: "Application: a-starting[id]: State changed to STARTING: Starting", + }, + { + Status: state.Configuring, + StatusMessage: "Configuring", + EventType: EventTypeState, + EventSubType: EventSubTypeConfig, + EventMessage: "Application: a-configuring[id]: State changed to CONFIG: Configuring", + }, + { + Status: state.Running, + StatusMessage: "Running", + EventType: EventTypeState, + EventSubType: EventSubTypeRunning, + EventMessage: "Application: a-running[id]: State changed to RUNNING: Running", + }, + { + Status: state.Degraded, + StatusMessage: "Degraded", + EventType: EventTypeState, + EventSubType: EventSubTypeRunning, + EventMessage: "Application: a-degraded[id]: State changed to DEGRADED: Degraded", + }, + { + Status: state.Failed, + StatusMessage: "Failed", + EventType: EventTypeError, + EventSubType: EventSubTypeFailed, + EventMessage: "Application: a-failed[id]: State changed to FAILED: Failed", + }, + { + Status: state.Crashed, + StatusMessage: "Crashed", + EventType: EventTypeError, + EventSubType: EventSubTypeFailed, + EventMessage: "Application: a-crashed[id]: State changed to CRASHED: Crashed", + }, + { + Status: state.Stopping, + StatusMessage: "Stopping", + EventType: EventTypeState, + EventSubType: EventSubTypeStopping, + EventMessage: "Application: a-stopping[id]: State changed to STOPPING: Stopping", + }, + { + Status: state.Restarting, + StatusMessage: "Restarting", + EventType: EventTypeState, + EventSubType: EventSubTypeStarting, + EventMessage: "Application: a-restarting[id]: State changed to RESTARTING: Restarting", + }, } - - if r := result.SubType(); r != EventSubTypeStarting { - t.Errorf("OnStarting: expected event type '%v', got '%v'", EventSubTypeStarting, r) - } - - // test in progress - rep.OnRunning(context.Background(), "a2") - if r := result.Type(); r != EventTypeState { - t.Errorf("OnRunning: expected record type '%v', got '%v'", EventTypeState, r) - } - - if r := result.SubType(); r != EventSubTypeInProgress { - t.Errorf("OnRunning: expected event type '%v', got '%v'", EventSubTypeStarting, r) - } - - // test stopping - rep.OnStopping(context.Background(), "a3") - if r := result.Type(); r != EventTypeState { - t.Errorf("OnStopping: expected record type '%v', got '%v'", EventTypeState, r) - } - - if r := result.SubType(); r != EventSubTypeStopping { - t.Errorf("OnStopping: expected event type '%v', got '%v'", EventSubTypeStarting, r) - } - - // test stopped - rep.OnStopped(context.Background(), "a4") - if r := result.Type(); r != EventTypeState { - t.Errorf("OnStopped: expected record type '%v', got '%v'", EventTypeState, r) - } - - if r := result.SubType(); r != EventSubTypeStopped { - t.Errorf("OnStopped: expected event type '%v', got '%v'", EventSubTypeStarting, r) - } - - // test failing - err := errors.New("e1") - rep.OnFailing(context.Background(), "a5", err) - if r := result.Type(); r != EventTypeError { - t.Errorf("OnFailing: expected record type '%v', got '%v'", EventTypeState, r) - } - - if r := result.SubType(); r != EventSubTypeConfig { - t.Errorf("OnFailing: expected event type '%v', got '%v'", EventSubTypeStarting, r) - } - - // test fatal - err = errors.New("e2") - rep.OnFatal(context.Background(), "a6", err) - if r := result.Type(); r != EventTypeError { - t.Errorf("OnFatal: expected record type '%v', got '%v'", EventTypeState, r) - } - - if r := result.SubType(); r != EventSubTypeConfig { - t.Errorf("OnFatal: expected event type '%v', got '%v'", EventSubTypeStarting, r) + for _, scenario := range scenarios { + t.Run(scenario.StatusMessage, func(t *testing.T) { + appID := fmt.Sprintf("a-%s", strings.ToLower(scenario.StatusMessage)) + appName := fmt.Sprintf("app-%s", strings.ToLower(scenario.StatusMessage)) + rep.OnStateChange(appID, appName, state.State{ + Status: scenario.Status, + Message: scenario.StatusMessage, + }) + assert.Equal(t, scenario.EventType, result.Type()) + assert.Equal(t, scenario.EventSubType, result.SubType()) + assert.Equal(t, scenario.EventMessage, result.Message()) + }) } } diff --git a/x-pack/libbeat/management/fleet/manager.go b/x-pack/libbeat/management/fleet/manager.go index 9b20b17bc48e..0fa205220e4d 100644 --- a/x-pack/libbeat/management/fleet/manager.go +++ b/x-pack/libbeat/management/fleet/manager.go @@ -161,16 +161,15 @@ func (cm *Manager) OnConfig(s string) { blocks, err := cm.toConfigBlocks(configMap) if err != nil { - err = errors.Wrap(err, "could not apply the configuration") + err = errors.Wrap(err, "failed to parse configuration") cm.logger.Error(err) cm.UpdateStatus(management.Failed, err.Error()) return } if errs := cm.apply(blocks); !errs.IsEmpty() { - err = errors.Wrap(err, "could not apply the configuration") - cm.logger.Error(err) - cm.UpdateStatus(management.Failed, err.Error()) + // `cm.apply` already logs the errors; currently allow beat to run degraded + cm.UpdateStatus(management.Degraded, errs.Error()) return }