diff --git a/x-pack/elastic-agent/CHANGELOG.asciidoc b/x-pack/elastic-agent/CHANGELOG.asciidoc index b926202727a9..6677424b86e2 100644 --- a/x-pack/elastic-agent/CHANGELOG.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.asciidoc @@ -43,6 +43,7 @@ - Avoid watching monitor logs {pull}18723[18723] - Guard against empty stream.datasource and namespace {pull}18769[18769] - Fix install service script for windows {pull}18814[18814] +- Properly stops subprocess on shutdown {pull}19567[19567] ==== New features diff --git a/x-pack/elastic-agent/pkg/agent/application/local_mode.go b/x-pack/elastic-agent/pkg/agent/application/local_mode.go index dc7a81e198af..9af112c43583 100644 --- a/x-pack/elastic-agent/pkg/agent/application/local_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/local_mode.go @@ -23,9 +23,10 @@ import ( type emitterFunc func(*config.Config) error -// ConfigHandler is capable of handling config and perform actions at it. +// ConfigHandler is capable of handling config, perform actions at it, shutdown any long running process. type ConfigHandler interface { HandleConfig(configrequest.Request) error + Shutdown() } type discoverFunc func() ([]string, error) @@ -39,6 +40,7 @@ type Local struct { bgContext context.Context cancelCtxFn context.CancelFunc log *logger.Logger + router *router source source agentInfo *info.AgentInfo srv *server.Server @@ -97,6 +99,7 @@ func newLocal( if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } + localApplication.router = router discover := discoverer(pathConfigFile, c.Management.Path) emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}, monitor) @@ -132,9 +135,11 @@ func (l *Local) Start() error { // Stop stops a local agent. func (l *Local) Stop() error { + err := l.source.Stop() l.cancelCtxFn() + l.router.Shutdown() l.srv.Stop() - return l.source.Stop() + return err } // AgentInfo retrieves agent information. diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index 6ecd6321cb23..1d9efc5683d2 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -46,6 +46,7 @@ type Managed struct { api apiClient agentInfo *info.AgentInfo gateway *fleetGateway + router *router srv *server.Server } @@ -140,6 +141,7 @@ func newManaged( if err != nil { return nil, errors.New(err, "fail to initialize pipeline router") } + managedApplication.router = router emit := emitter( log, @@ -221,6 +223,7 @@ func (m *Managed) Start() error { func (m *Managed) Stop() error { defer m.log.Info("Agent is stopped") m.cancelCtxFn() + m.router.Shutdown() m.srv.Stop() return nil } diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go index b5303d0dc605..15af254beafb 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go @@ -86,6 +86,8 @@ func (m *mockStreamStore) Close() error { return nil } +func (m *mockStreamStore) Shutdown() {} + const fleetResponse = ` { "action": "checkin", diff --git a/x-pack/elastic-agent/pkg/agent/application/router.go b/x-pack/elastic-agent/pkg/agent/application/router.go index b48156d7d48c..a86e0aef1dfc 100644 --- a/x-pack/elastic-agent/pkg/agent/application/router.go +++ b/x-pack/elastic-agent/pkg/agent/application/router.go @@ -21,6 +21,7 @@ type routingKey = string type stream interface { Execute(*configRequest) error Close() error + Shutdown() } type streamFunc func(*logger.Logger, routingKey) (stream, error) @@ -112,3 +113,16 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e return nil } + +// Shutdown shutdowns the router because Agent is stopping. +func (r *router) Shutdown() { + keys := r.routes.Keys() + for _, k := range keys { + p, ok := r.routes.Get(k) + if !ok { + continue + } + p.(stream).Shutdown() + r.routes.Remove(k) + } +} diff --git a/x-pack/elastic-agent/pkg/agent/application/router_test.go b/x-pack/elastic-agent/pkg/agent/application/router_test.go index 6edcd145eeaa..4054692281b2 100644 --- a/x-pack/elastic-agent/pkg/agent/application/router_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/router_test.go @@ -209,6 +209,8 @@ func (m *mockStream) Close() error { return nil } +func (m *mockStream) Shutdown() {} + func (m *mockStream) event(op rOp, args ...interface{}) { m.notify(m.rk, op, args...) } diff --git a/x-pack/elastic-agent/pkg/agent/application/stream.go b/x-pack/elastic-agent/pkg/agent/application/stream.go index 88262300d9ab..bfe015c46610 100644 --- a/x-pack/elastic-agent/pkg/agent/application/stream.go +++ b/x-pack/elastic-agent/pkg/agent/application/stream.go @@ -35,6 +35,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error { return b.configHandler.HandleConfig(cfg) } +func (b *operatorStream) Shutdown() { + b.configHandler.Shutdown() +} + func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) { return func(log *logger.Logger, id routingKey) (stream, error) { // new operator per stream to isolate processes without using tags diff --git a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go index e53e16b08e56..82df2a402e99 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go +++ b/x-pack/elastic-agent/pkg/agent/operation/monitoring_test.go @@ -152,7 +152,8 @@ func (*testMonitorableApp) Started() bool { return false } func (*testMonitorableApp) Start(_ context.Context, _ app.Taggable, cfg map[string]interface{}) error { return nil } -func (*testMonitorableApp) Stop() {} +func (*testMonitorableApp) Stop() {} +func (*testMonitorableApp) Shutdown() {} func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error { return nil } diff --git a/x-pack/elastic-agent/pkg/agent/operation/operation.go b/x-pack/elastic-agent/pkg/agent/operation/operation.go index ebe6bc3065db..d14852d7eabb 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operation.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operation.go @@ -38,6 +38,7 @@ type Application interface { Started() bool Start(ctx context.Context, p app.Taggable, cfg map[string]interface{}) error Stop() + Shutdown() Configure(ctx context.Context, config map[string]interface{}) error Monitor() monitoring.Monitor State() state.State diff --git a/x-pack/elastic-agent/pkg/agent/operation/operator.go b/x-pack/elastic-agent/pkg/agent/operation/operator.go index ed0b2d0ba437..1790b03a5a0f 100644 --- a/x-pack/elastic-agent/pkg/agent/operation/operator.go +++ b/x-pack/elastic-agent/pkg/agent/operation/operator.go @@ -158,6 +158,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error { return nil } +// Shutdown handles shutting down the running apps for Agent shutdown. +func (o *Operator) Shutdown() { + for _, app := range o.apps { + app.Shutdown() + } +} + // Start starts a new process based on a configuration // specific configuration of new process is passed func (o *Operator) start(p Descriptor, cfg map[string]interface{}) (err error) { diff --git a/x-pack/elastic-agent/pkg/core/plugin/process/app.go b/x-pack/elastic-agent/pkg/core/plugin/process/app.go index 7696665bbd02..cc6c4c838959 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/process/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/process/app.go @@ -155,6 +155,12 @@ func (a *Application) Stop() { a.setState(state.Stopped, "Stopped") } +// Shutdown stops the application (aka. subprocess). +func (a *Application) Shutdown() { + a.logger.Infof("Signaling application to stop because of shutdown: %s", a.id) + a.Stop() +} + // SetState sets the status of the application. func (a *Application) SetState(status state.Status, msg string) { a.appLock.Lock() diff --git a/x-pack/elastic-agent/pkg/core/plugin/service/app.go b/x-pack/elastic-agent/pkg/core/plugin/service/app.go index d4975d828c64..3c8a42a0db88 100644 --- a/x-pack/elastic-agent/pkg/core/plugin/service/app.go +++ b/x-pack/elastic-agent/pkg/core/plugin/service/app.go @@ -242,6 +242,25 @@ func (a *Application) Stop() { a.stopCredsListener() } +// Shutdown disconnects the service, but doesn't signal it to stop. +func (a *Application) Shutdown() { + a.appLock.Lock() + defer a.appLock.Unlock() + + if a.srvState == nil { + return + } + + // destroy the application in the server, this skips sending + // the expected stopping state to the service + a.setState(state.Stopped, "Stopped") + a.srvState.Destroy() + a.srvState = nil + + a.cleanUp() + a.stopCredsListener() +} + // OnStatusChange is the handler called by the GRPC server code. // // It updates the status of the application and handles restarting the application is needed. diff --git a/x-pack/elastic-agent/pkg/core/process/cmd_cred.go b/x-pack/elastic-agent/pkg/core/process/cmd_darwin.go similarity index 97% rename from x-pack/elastic-agent/pkg/core/process/cmd_cred.go rename to x-pack/elastic-agent/pkg/core/process/cmd_darwin.go index 8b51e2d6265a..0d1921941573 100644 --- a/x-pack/elastic-agent/pkg/core/process/cmd_cred.go +++ b/x-pack/elastic-agent/pkg/core/process/cmd_darwin.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -// +build linux darwin +// +build darwin package process diff --git a/x-pack/elastic-agent/pkg/core/process/cmd_linux.go b/x-pack/elastic-agent/pkg/core/process/cmd_linux.go new file mode 100644 index 000000000000..3c28ab54c4be --- /dev/null +++ b/x-pack/elastic-agent/pkg/core/process/cmd_linux.go @@ -0,0 +1,44 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// +build linux + +package process + +import ( + "math" + "os" + "os/exec" + "path/filepath" + "syscall" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" +) + +func getCmd(logger *logger.Logger, path string, env []string, uid, gid int, arg ...string) *exec.Cmd { + cmd := exec.Command(path, arg...) + cmd.Env = append(cmd.Env, os.Environ()...) + cmd.Env = append(cmd.Env, env...) + cmd.Dir = filepath.Dir(path) + if isInt32(uid) && isInt32(gid) { + cmd.SysProcAttr = &syscall.SysProcAttr{ + // on shutdown all sub-processes are sent SIGTERM, in the case that the Agent dies or is -9 killed + // then also kill the children (only supported on linux) + Pdeathsig: syscall.SIGKILL, + Credential: &syscall.Credential{ + Uid: uint32(uid), + Gid: uint32(gid), + NoSetGroups: true, + }, + } + } else { + logger.Errorf("provided uid or gid for %s is invalid. uid: '%d' gid: '%d'.", path, uid, gid) + } + + return cmd +} + +func isInt32(val int) bool { + return val >= 0 && val <= math.MaxInt32 +}