Skip to content

Commit

Permalink
[Agent] Improve the handling of failed applications, fixed crash in F…
Browse files Browse the repository at this point in the history
…leetManager, report all state changes to Fleet (elastic#19178)

* Improve the handling of restarting applications on failure and crashes.

* Fix reporting in the fleet manager. Fix reporting in Agent to push all state changes of the application to fleet.

* Fix imports.
  • Loading branch information
blakerouse authored Jun 15, 2020
1 parent e76e914 commit a655d2e
Show file tree
Hide file tree
Showing 21 changed files with 339 additions and 404 deletions.
11 changes: 7 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,13 @@ func newManaged(
managedApplication.bgContext, managedApplication.cancelCtxFn = context.WithCancel(ctx)
managedApplication.srv, err = server.NewFromConfig(log, rawConfig, &app.ApplicationStatusHandler{})
if err != nil {
return nil, errors.New(err, "initialize GRPC listener")
return nil, errors.New(err, "initialize GRPC listener", errors.TypeNetwork)
}
// must start before `Start` is called as Fleet will already try to start applications
// before `Start` is even called.
err = managedApplication.srv.Start()
if err != nil {
return nil, errors.New(err, "starting GRPC listener", errors.TypeNetwork)
}

logR := logreporter.NewReporter(log, cfg.Reporting.Log)
Expand Down Expand Up @@ -208,9 +214,6 @@ func newManaged(
// Start starts a managed elastic-agent.
func (m *Managed) Start() error {
m.log.Info("Agent is starting")
if err := m.srv.Start(); err != nil {
return err
}
m.gateway.Start()
return nil
}
Expand Down
29 changes: 3 additions & 26 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ package application

import (
"context"
"io"
"net/http"
"net/url"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation"
Expand All @@ -19,30 +16,10 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/app/monitoring"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
)

// EventProcessor is an processor of application event
type reporter interface {
OnStarting(ctx context.Context, app string)
OnRunning(ctx context.Context, app string)
OnFailing(ctx context.Context, app string, err error)
OnStopping(ctx context.Context, app string)
OnStopped(ctx context.Context, app string)
OnFatal(ctx context.Context, app string, err error)
}

type sender interface {
Send(
ctx context.Context,
method string,
path string,
params url.Values,
headers http.Header,
body io.Reader,
) (*http.Response, error)
}

type operatorStream struct {
configHandler ConfigHandler
log *logger.Logger
Expand All @@ -57,7 +34,7 @@ func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
operator, err := newOperator(ctx, log, id, cfg, srv, r, m)
Expand All @@ -72,7 +49,7 @@ func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server,
}
}

func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, srv *server.Server, r reporter, m monitoring.Monitor) (*operation.Operator, error) {
func newOperator(ctx context.Context, log *logger.Logger, id routingKey, config *config.Config, srv *server.Server, r state.Reporter, m monitoring.Monitor) (*operation.Operator, error) {
operatorConfig := operatorCfg.DefaultConfig()
if err := config.Unpack(&operatorConfig); err != nil {
return nil, err
Expand Down
26 changes: 0 additions & 26 deletions x-pack/elastic-agent/pkg/agent/operation/event_processor.go

This file was deleted.

5 changes: 3 additions & 2 deletions x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,9 @@ func (*testMonitorableApp) Stop() {}
func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error {
return nil
}
func (*testMonitorableApp) State() state.State { return state.State{} }
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }
func (*testMonitorableApp) State() state.State { return state.State{} }
func (*testMonitorableApp) SetState(_ state.Status, _ string) {}
func (a *testMonitorableApp) Monitor() monitoring.Monitor { return a.monitor }

type testMonitor struct {
monitorLogs bool
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Application interface {
Configure(ctx context.Context, config map[string]interface{}) error
Monitor() monitoring.Monitor
State() state.State
SetState(status state.Status, msg string)
}

// Descriptor defines a program which needs to be run.
Expand Down
12 changes: 3 additions & 9 deletions x-pack/elastic-agent/pkg/agent/operation/operation_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"
)

var (
Expand All @@ -25,19 +26,16 @@ type operationConfig struct {
logger *logger.Logger
operatorConfig *config.Config
cfg map[string]interface{}
eventProcessor callbackHooks
}

func newOperationConfig(
logger *logger.Logger,
operatorConfig *config.Config,
cfg map[string]interface{},
eventProcessor callbackHooks) *operationConfig {
cfg map[string]interface{}) *operationConfig {
return &operationConfig{
logger: logger,
operatorConfig: operatorConfig,
cfg: cfg,
eventProcessor: eventProcessor,
}
}

Expand All @@ -55,11 +53,7 @@ func (o *operationConfig) Check(_ Application) (bool, error) { return true, nil
func (o *operationConfig) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
err = errors.New(err,
o.Name(),
errors.TypeApplication,
errors.M(errors.MetaKeyAppName, application.Name()))
o.eventProcessor.OnFailing(ctx, application.Name(), err)
application.SetState(state.Failed, err.Error())
}
}()
return application.Configure(ctx, o.cfg)
Expand Down
13 changes: 3 additions & 10 deletions x-pack/elastic-agent/pkg/agent/operation/operation_fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"context"
"os"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/download"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"
)

// operationFetch fetches artifact from preconfigured source
Expand All @@ -22,22 +22,19 @@ type operationFetch struct {
program Descriptor
operatorConfig *config.Config
downloader download.Downloader
eventProcessor callbackHooks
}

func newOperationFetch(
logger *logger.Logger,
program Descriptor,
operatorConfig *config.Config,
downloader download.Downloader,
eventProcessor callbackHooks) *operationFetch {
downloader download.Downloader) *operationFetch {

return &operationFetch{
logger: logger,
program: program,
operatorConfig: operatorConfig,
downloader: downloader,
eventProcessor: eventProcessor,
}
}

Expand Down Expand Up @@ -69,11 +66,7 @@ func (o *operationFetch) Check(_ Application) (bool, error) {
func (o *operationFetch) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
err = errors.New(err,
o.Name(),
errors.TypeApplication,
errors.M(errors.MetaKeyAppName, application.Name()))
o.eventProcessor.OnFailing(ctx, application.Name(), err)
application.SetState(state.Failed, err.Error())
}
}()

Expand Down
13 changes: 3 additions & 10 deletions x-pack/elastic-agent/pkg/agent/operation/operation_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"context"
"os"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/artifact/install"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"
)

// operationInstall installs a artifact from predefined location
Expand All @@ -21,22 +21,19 @@ type operationInstall struct {
program Descriptor
operatorConfig *config.Config
installer install.Installer
eventProcessor callbackHooks
}

func newOperationInstall(
logger *logger.Logger,
program Descriptor,
operatorConfig *config.Config,
installer install.Installer,
eventProcessor callbackHooks) *operationInstall {
installer install.Installer) *operationInstall {

return &operationInstall{
logger: logger,
program: program,
operatorConfig: operatorConfig,
installer: installer,
eventProcessor: eventProcessor,
}
}

Expand All @@ -58,11 +55,7 @@ func (o *operationInstall) Check(_ Application) (bool, error) {
func (o *operationInstall) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
err = errors.New(err,
o.Name(),
errors.TypeApplication,
errors.M(errors.MetaKeyAppName, application.Name()))
o.eventProcessor.OnFailing(ctx, application.Name(), err)
application.SetState(state.Failed, err.Error())
}
}()

Expand Down
13 changes: 4 additions & 9 deletions x-pack/elastic-agent/pkg/agent/operation/operation_remove.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,15 @@ package operation
import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"
)

// operationRemove uninstall and removes all the bits related to the artifact
type operationRemove struct {
eventProcessor callbackHooks
}

func newOperationRemove(eventProcessor callbackHooks) *operationRemove {
return &operationRemove{eventProcessor: eventProcessor}
func newOperationRemove() *operationRemove {
return &operationRemove{}
}

// Name is human readable name identifying an operation
Expand All @@ -35,11 +34,7 @@ func (o *operationRemove) Check(_ Application) (bool, error) {
func (o *operationRemove) Run(ctx context.Context, application Application) (err error) {
defer func() {
if err != nil {
o.eventProcessor.OnFailing(ctx, application.Name(), err)
err = errors.New(err,
o.Name(),
errors.TypeApplication,
errors.M(errors.MetaKeyAppName, application.Name()))
application.SetState(state.Failed, err.Error())
}
}()

Expand Down
19 changes: 3 additions & 16 deletions x-pack/elastic-agent/pkg/agent/operation/operation_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@ package operation
import (
"context"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/operation/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/state"
)

// operationStart start installed process
Expand All @@ -22,7 +20,6 @@ type operationStart struct {
program Descriptor
operatorConfig *config.Config
cfg map[string]interface{}
eventProcessor callbackHooks

pi *process.Info
}
Expand All @@ -31,16 +28,14 @@ func newOperationStart(
logger *logger.Logger,
program Descriptor,
operatorConfig *config.Config,
cfg map[string]interface{},
eventProcessor callbackHooks) *operationStart {
cfg map[string]interface{}) *operationStart {
// TODO: make configurable

return &operationStart{
logger: logger,
program: program,
operatorConfig: operatorConfig,
cfg: cfg,
eventProcessor: eventProcessor,
}
}

Expand All @@ -63,17 +58,9 @@ func (o *operationStart) Check(application Application) (bool, error) {

// Run runs the operation
func (o *operationStart) Run(ctx context.Context, application Application) (err error) {
o.eventProcessor.OnStarting(ctx, application.Name())
defer func() {
if err != nil {
// kill the process if something failed
err = errors.New(err,
o.Name(),
errors.TypeApplication,
errors.M(errors.MetaKeyAppName, application.Name()))
o.eventProcessor.OnFailing(ctx, application.Name(), err)
} else {
o.eventProcessor.OnRunning(ctx, application.Name())
application.SetState(state.Failed, err.Error())
}
}()

Expand Down
Loading

0 comments on commit a655d2e

Please sign in to comment.