Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ingest Manager] Restart process on output change #24907

Merged
merged 10 commits into from
Apr 19, 2021
19 changes: 10 additions & 9 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@
- Fixed Monitoring filebeat and metricbeat not connecting to Agent over GRPC {pull}23843[23843]
- Fixed make status readable in the log. {pull}23849[23849]
- Windows agent doesn't uninstall with a lowercase `c:` drive in the path {pull}23998[23998]
- Fix reloading of log level for services {pull}[24055]24055
- Fix: Successfully installed and enrolled agent running standalone{pull}[24128]24128
- Make installer atomic on windows {pull}[24253]24253
- Remove installed services on agent uninstall {pull}[24151]24151
- Fix failing installation on windows 7 {pull}[24387]24387
- Fix capabilities resolution in inspect command {pull}[24346]24346
- Fix windows installer during enroll {pull}[24343]24343
- Logging to file disabled on enroll {issue}[24173]24173
- Prevent uninstall failures on empty config {pull}[24838]24838
- Fix reloading of log level for services {pull}24055[24055]
- Fix: Successfully installed and enrolled agent running standalone{pull}24128[24128]
- Make installer atomic on windows {pull}24253[24253]
- Remove installed services on agent uninstall {pull}24151[24151]
- Fix failing installation on windows 7 {pull}24387[24387]
- Fix capabilities resolution in inspect command {pull}24346[24346]
- Fix windows installer during enroll {pull}24343[24343]
- Logging to file disabled on enroll {issue}24173[24173]
- Prevent uninstall failures on empty config {pull}24838[24838]

==== New features

Expand All @@ -63,6 +63,7 @@
- Add `event.dataset` to all events {pull}20076[20076]
- Send datastreams fields {pull}20416[20416]
- Agent supports capabilities definition {pull}23848[23848]
- Restart process on output change {pull}24907[24907]

[[release-notes-7.8.0]]
=== Elastic Agent version 7.8.0
Expand Down
29 changes: 15 additions & 14 deletions x-pack/elastic-agent/pkg/agent/program/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,21 @@ var ErrMissingWhen = errors.New("program must define a 'When' expression")
// NOTE: Current spec are build at compile time, we want to revisit that to allow other program
// to register their spec in a secure way.
type Spec struct {
Name string `yaml:"name"`
ServicePort int `yaml:"service,omitempty"`
Cmd string `yaml:"cmd"`
Args []string `yaml:"args"`
Artifact string `yaml:"artifact"`
ActionInputTypes []string `yaml:"action_input_types,omitempty"`
LogPaths map[string]string `yaml:"log_paths,omitempty"`
MetricEndpoints map[string]string `yaml:"metric_endpoints,omitempty"`
Rules *transpiler.RuleList `yaml:"rules"`
CheckInstallSteps *transpiler.StepList `yaml:"check_install"`
PostInstallSteps *transpiler.StepList `yaml:"post_install"`
PreUninstallSteps *transpiler.StepList `yaml:"pre_uninstall"`
When string `yaml:"when"`
Constraints string `yaml:"constraints"`
Name string `yaml:"name"`
ServicePort int `yaml:"service,omitempty"`
Cmd string `yaml:"cmd"`
Args []string `yaml:"args"`
Artifact string `yaml:"artifact"`
ActionInputTypes []string `yaml:"action_input_types,omitempty"`
LogPaths map[string]string `yaml:"log_paths,omitempty"`
MetricEndpoints map[string]string `yaml:"metric_endpoints,omitempty"`
Rules *transpiler.RuleList `yaml:"rules"`
CheckInstallSteps *transpiler.StepList `yaml:"check_install"`
PostInstallSteps *transpiler.StepList `yaml:"post_install"`
PreUninstallSteps *transpiler.StepList `yaml:"pre_uninstall"`
When string `yaml:"when"`
Constraints string `yaml:"constraints"`
RestartOnOutputChange bool `yaml:"restart_on_output_change,omitempty"`
}

// ReadSpecs reads all the specs that match the provided globbing path.
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/pkg/agent/program/supported.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

79 changes: 79 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package plugin

import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

type configFetcher interface {
Config() string
}

// IsRestartNeeded returns true if
// - spec is configured to support restart on change
// - output changes in between configs
func IsRestartNeeded(log *logger.Logger, spec program.Spec, cfgFetch configFetcher, newCfg map[string]interface{}) bool {
// compare outputs
curCfgStr := cfgFetch.Config()
if curCfgStr == "" {
// no config currently applied
return false
}

currentOutput, err := getOutputConfigFromString(curCfgStr)
if err != nil {
log.Errorf("failed to retrieve output config from current state: %v", err)
return false
}

newOutput, err := getOutputConfigFromMap(newCfg)
if err != nil {
log.Errorf("failed to retrieve output config from new state: %v", err)
return false
}

// restart needed only if specified and output changed
return spec.RestartOnOutputChange && currentOutput != newOutput
}

func getOutputConfigFromString(cfgString string) (string, error) {
cfg, err := config.NewConfigFrom(cfgString)
if err != nil {
return "", err
}

cfgMap, err := cfg.ToMapStr()
if err != nil {
return "", err
}

return getOutputConfigFromMap(cfgMap)
}

func getOutputConfigFromMap(cfgMap map[string]interface{}) (string, error) {
outputCfgIface, found := cfgMap["output"]
if !found {
// output not found not an error
return "", nil
}

outputCfg, ok := outputCfgIface.(map[string]interface{})
if !ok {
return "", errors.New("not a map")
}

cfgStr, err := yaml.Marshal(outputCfg)
if err != nil {
return "", errors.New(err, errors.TypeApplication)
}

return string(cfgStr), nil
}
95 changes: 95 additions & 0 deletions x-pack/elastic-agent/pkg/core/plugin/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package plugin

import (
"testing"

"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/program"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
)

func TestRestartNeeded(t *testing.T) {
tt := []struct {
Name string
OldOutput map[string]interface{}
NewOutput map[string]interface{}
ShouldRestart bool

ExpectedRestart bool
}{
{
"same empty output",
map[string]interface{}{},
map[string]interface{}{},
true,
false,
},
{
"same not empty output",
map[string]interface{}{"output": map[string]interface{}{"username": "user", "password": "123456"}},
map[string]interface{}{"output": map[string]interface{}{"username": "user", "password": "123456"}},
true,
false,
},
{
"different empty output",
map[string]interface{}{},
map[string]interface{}{"output": map[string]interface{}{"username": "user", "password": "123456"}},
true,
false,
},
{
"different not empty output",
map[string]interface{}{"output": map[string]interface{}{"username": "user", "password": "123456"}},
map[string]interface{}{"output": map[string]interface{}{"username": "user", "password": "s3cur3_Pa55;"}},
true,
true,
},
{
"different not empty output no restart required",
map[string]interface{}{"output": map[string]interface{}{"username": "user", "password": "123456"}},
map[string]interface{}{"output": map[string]interface{}{"username": "user", "password": "s3cur3_Pa55;"}},
false,
false,
},
}

for _, tc := range tt {
t.Run(tc.Name, func(t *testing.T) {
cf, err := newTestConfigFetcher(tc.OldOutput)
require.NoError(t, err)
s := testProgramSpec(tc.ShouldRestart)
l, _ := logger.New("tst", false)

IsRestartNeeded(l, s, cf, tc.NewOutput)
})
}
}

func newTestConfigFetcher(cfg map[string]interface{}) (*testConfigFetcher, error) {
cfgStr, err := yaml.Marshal(cfg)
if err != nil {
return nil, errors.New(err, errors.TypeApplication)
}

return &testConfigFetcher{cfg: string(cfgStr)}, nil
}

type testConfigFetcher struct {
cfg string
}

func (f testConfigFetcher) Config() string { return f.cfg }

func testProgramSpec(restartOnOutput bool) program.Spec {
return program.Spec{
RestartOnOutputChange: restartOnOutput,
}
}
17 changes: 15 additions & 2 deletions x-pack/elastic-agent/pkg/core/plugin/process/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
)

// Configure configures the application with the passed configuration.
func (a *Application) Configure(_ context.Context, config map[string]interface{}) (err error) {
func (a *Application) Configure(ctx context.Context, config map[string]interface{}) (err error) {
defer func() {
if err != nil {
// inject App metadata
Expand All @@ -37,10 +38,22 @@ func (a *Application) Configure(_ context.Context, config map[string]interface{}
if err != nil {
return errors.New(err, errors.TypeApplication)
}

isRestartNeeded := plugin.IsRestartNeeded(a.logger, a.Spec(), a.srvState, config)

err = a.srvState.UpdateConfig(string(cfgStr))
if err != nil {
return errors.New(err, errors.TypeApplication)
}

return nil
if isRestartNeeded {
a.logger.Infof("initiating restart of '%s' due to config change", a.Name())
a.appLock.Unlock()
a.Stop()
err = a.Start(ctx, a.desc, config)
// lock back so it wont panic on deferred unlock
a.appLock.Lock()
}

return err
}
Loading