Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Properly stop subprocess when receiving SIGTERM #19567

Merged
merged 3 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
- Avoid watching monitor logs {pull}18723[18723]
- Guard against empty stream.datasource and namespace {pull}18769[18769]
- Fix install service script for windows {pull}18814[18814]
- Properly stops subprocess on shutdown {pull}19567[19567]

==== New features

Expand Down
9 changes: 7 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/local_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ import (

type emitterFunc func(*config.Config) error

// ConfigHandler is capable of handling config and perform actions at it.
// ConfigHandler is capable of handling config, perform actions at it, shutdown any long running process.
type ConfigHandler interface {
HandleConfig(configrequest.Request) error
Shutdown()
}

type discoverFunc func() ([]string, error)
Expand All @@ -39,6 +40,7 @@ type Local struct {
bgContext context.Context
cancelCtxFn context.CancelFunc
log *logger.Logger
router *router
source source
agentInfo *info.AgentInfo
srv *server.Server
Expand Down Expand Up @@ -97,6 +99,7 @@ func newLocal(
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
localApplication.router = router

discover := discoverer(pathConfigFile, c.Management.Path)
emit := emitter(log, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}, Filters: []filterFunc{filters.ConstraintFilter}}, monitor)
Expand Down Expand Up @@ -132,9 +135,11 @@ func (l *Local) Start() error {

// Stop stops a local agent.
func (l *Local) Stop() error {
err := l.source.Stop()
l.cancelCtxFn()
l.router.Shutdown()
l.srv.Stop()
return l.source.Stop()
return err
}

// AgentInfo retrieves agent information.
Expand Down
3 changes: 3 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Managed struct {
api apiClient
agentInfo *info.AgentInfo
gateway *fleetGateway
router *router
srv *server.Server
}

Expand Down Expand Up @@ -140,6 +141,7 @@ func newManaged(
if err != nil {
return nil, errors.New(err, "fail to initialize pipeline router")
}
managedApplication.router = router

emit := emitter(
log,
Expand Down Expand Up @@ -221,6 +223,7 @@ func (m *Managed) Start() error {
func (m *Managed) Stop() error {
defer m.log.Info("Agent is stopped")
m.cancelCtxFn()
m.router.Shutdown()
m.srv.Stop()
return nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func (m *mockStreamStore) Close() error {
return nil
}

func (m *mockStreamStore) Shutdown() {}

const fleetResponse = `
{
"action": "checkin",
Expand Down
14 changes: 14 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type routingKey = string
type stream interface {
Execute(*configRequest) error
Close() error
Shutdown()
}

type streamFunc func(*logger.Logger, routingKey) (stream, error)
Expand Down Expand Up @@ -112,3 +113,16 @@ func (r *router) Dispatch(id string, grpProg map[routingKey][]program.Program) e

return nil
}

// Shutdown shutdowns the router because Agent is stopping.
func (r *router) Shutdown() {
keys := r.routes.Keys()
for _, k := range keys {
p, ok := r.routes.Get(k)
if !ok {
continue
}
p.(stream).Shutdown()
r.routes.Remove(k)
}
}
2 changes: 2 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ func (m *mockStream) Close() error {
return nil
}

func (m *mockStream) Shutdown() {}

func (m *mockStream) event(op rOp, args ...interface{}) {
m.notify(m.rk, op, args...)
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (b *operatorStream) Execute(cfg *configRequest) error {
return b.configHandler.HandleConfig(cfg)
}

func (b *operatorStream) Shutdown() {
b.configHandler.Shutdown()
}

func streamFactory(ctx context.Context, cfg *config.Config, srv *server.Server, r state.Reporter, m monitoring.Monitor) func(*logger.Logger, routingKey) (stream, error) {
return func(log *logger.Logger, id routingKey) (stream, error) {
// new operator per stream to isolate processes without using tags
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (*testMonitorableApp) Started() bool { return false }
func (*testMonitorableApp) Start(_ context.Context, _ app.Taggable, cfg map[string]interface{}) error {
return nil
}
func (*testMonitorableApp) Stop() {}
func (*testMonitorableApp) Stop() {}
func (*testMonitorableApp) Shutdown() {}
func (*testMonitorableApp) Configure(_ context.Context, config map[string]interface{}) error {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Application interface {
Started() bool
Start(ctx context.Context, p app.Taggable, cfg map[string]interface{}) error
Stop()
Shutdown()
Configure(ctx context.Context, config map[string]interface{}) error
Monitor() monitoring.Monitor
State() state.State
Expand Down
7 changes: 7 additions & 0 deletions x-pack/elastic-agent/pkg/agent/operation/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ func (o *Operator) HandleConfig(cfg configrequest.Request) error {
return nil
}

// Shutdown handles shutting down the running apps for Agent shutdown.
func (o *Operator) Shutdown() {
for _, app := range o.apps {
app.Shutdown()
}
}

// Start starts a new process based on a configuration
// specific configuration of new process is passed
func (o *Operator) start(p Descriptor, cfg map[string]interface{}) (err error) {
Expand Down
6 changes: 6 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/process/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ func (a *Application) Stop() {
a.setState(state.Stopped, "Stopped")
}

// Shutdown stops the application (aka. subprocess).
func (a *Application) Shutdown() {
a.logger.Infof("Signaling application to stop because of shutdown: %s", a.id)
a.Stop()
}

// SetState sets the status of the application.
func (a *Application) SetState(status state.Status, msg string) {
a.appLock.Lock()
Expand Down
19 changes: 19 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/service/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,25 @@ func (a *Application) Stop() {
a.stopCredsListener()
}

// Shutdown disconnects the service, but doesn't signal it to stop.
func (a *Application) Shutdown() {
a.appLock.Lock()
defer a.appLock.Unlock()

if a.srvState == nil {
return
}

// destroy the application in the server, this skips sending
// the expected stopping state to the service
a.setState(state.Stopped, "Stopped")
a.srvState.Destroy()
a.srvState = nil

a.cleanUp()
a.stopCredsListener()
}

// OnStatusChange is the handler called by the GRPC server code.
//
// It updates the status of the application and handles restarting the application is needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux darwin
// +build darwin

package process

Expand Down
44 changes: 44 additions & 0 deletions x-pack/elastic-agent/pkg/core/process/cmd_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

// +build linux

package process

import (
"math"
"os"
"os/exec"
"path/filepath"
"syscall"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func getCmd(logger *logger.Logger, path string, env []string, uid, gid int, arg ...string) *exec.Cmd {
cmd := exec.Command(path, arg...)
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, env...)
cmd.Dir = filepath.Dir(path)
if isInt32(uid) && isInt32(gid) {
cmd.SysProcAttr = &syscall.SysProcAttr{
// on shutdown all sub-processes are sent SIGTERM, in the case that the Agent dies or is -9 killed
// then also kill the children (only supported on linux)
Pdeathsig: syscall.SIGKILL,
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
NoSetGroups: true,
},
}
} else {
logger.Errorf("provided uid or gid for %s is invalid. uid: '%d' gid: '%d'.", path, uid, gid)
}

return cmd
}

func isInt32(val int) bool {
return val >= 0 && val <= math.MaxInt32
}