From 0d08e88e4a3846cc5f3d83410ddb0209404af516 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 31 May 2023 15:50:23 +0200 Subject: [PATCH] Fix input reload issue in Elastic-Agent with infinite retry logic (#35250) This commit addresses the input reload issue in Elastic-Agent by introducing an infinite retry logic in the ManagerV2. The implemented logic mirrors the configuration reload behavior of a standalone Beat. When reloading inputs, if there is at least one occurrence of 'common.ErrInputNotFinished', the 'forceReload' flag is set to true, and the debounce timer is initiated. This process will repeat until no 'common.ErrInputNotFinished' error is encountered. Additionally, the 'changeDebounce' period is extended to 1 second, and the 'forceReloadDebounce' period is set to 10 times the 'changeDebounce' value. --------- Co-authored-by: Blake Rouse Co-authored-by: Anderson Queiroz Co-authored-by: Denis (cherry picked from commit 137bc81e8cf966f2195fe590663c06b04dca74dd) --- CHANGELOG.next.asciidoc | 1 + Jenkinsfile | 2 +- filebeat/input/log/input.go | 35 +- libbeat/common/errors.go | 1 + x-pack/filebeat/magefile.go | 1 + .../tests/integration/framework_test.go | 136 +++++++ .../tests/integration/input_reload_test.go | 364 ++++++++++++++++++ x-pack/libbeat/management/config.go | 16 +- .../libbeat/management/input_reload_test.go | 207 ++++++++++ x-pack/libbeat/management/managerV2.go | 144 +++++-- x-pack/libbeat/management/managerV2_test.go | 32 +- x-pack/libbeat/management/test_utils.go | 37 ++ 12 files changed, 899 insertions(+), 77 deletions(-) create mode 100644 x-pack/filebeat/tests/integration/framework_test.go create mode 100644 x-pack/filebeat/tests/integration/input_reload_test.go create mode 100644 x-pack/libbeat/management/input_reload_test.go create mode 100644 x-pack/libbeat/management/test_utils.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 115ea2048d4d..361417994e0b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Sanitize filenames for request tracer in cel input. {pull}35154[35154] - Fix accidental error overwrite in defer statement in entityanalytics Azure AD input. {issue}35153[35153] {pull}35169[35169] - Fixing the grok expression outputs of log files {pull}35221[35221] +- Fixes "Can only start an input when all related states are finished" error when running under Elastic-Agent {pull}35250[35250] {issue}33653[33653] - Move repeated Windows event channel not found errors in winlog input to debug level. {issue}35314[35314] {pull}35317[35317] - Fix crash when processing forwarded logs missing a message. {issue}34705[34705] {pull}34865[34865] - Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433] diff --git a/Jenkinsfile b/Jenkinsfile index 3d8f004297f9..1da726b38cd1 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -707,7 +707,7 @@ def withBeatsEnv(Map args = [:], Closure body) { error("Error '${err.toString()}'") } finally { if (archive) { - archiveArtifacts(allowEmptyArchive: true, artifacts: "${directory}/build/system-tests/docker-logs/TEST-docker-compose-*.log") + archiveArtifacts(allowEmptyArchive: true, artifacts: "${directory}/build/system-tests/docker-logs/TEST-docker-compose-*.log, ${directory}/build/integration-tests/**/**") archiveTestOutput(directory: directory, testResults: testResults, artifacts: artifacts, id: args.id, upload: upload) } tearDown() diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index eeaa2de0c915..4882735d36b7 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -93,7 +93,9 @@ func NewInput( cleanupNeeded := true cleanupIfNeeded := func(f func() error) { if cleanupNeeded { - f() + if err := f(); err != nil { + logp.L().Named("input.log").Errorf("clean up function returned an error: %w", err) + } } } @@ -103,10 +105,10 @@ func NewInput( return nil, err } if err := inputConfig.resolveRecursiveGlobs(); err != nil { - return nil, fmt.Errorf("Failed to resolve recursive globs in config: %v", err) + return nil, fmt.Errorf("Failed to resolve recursive globs in config: %w", err) } if err := inputConfig.normalizeGlobPatterns(); err != nil { - return nil, fmt.Errorf("Failed to normalize globs patterns: %v", err) + return nil, fmt.Errorf("Failed to normalize globs patterns: %w", err) } if len(inputConfig.Paths) == 0 { @@ -115,7 +117,7 @@ func NewInput( identifier, err := file.NewStateIdentifier(inputConfig.FileIdentity) if err != nil { - return nil, fmt.Errorf("failed to initialize file identity generator: %+v", err) + return nil, fmt.Errorf("failed to initialize file identity generator: %w", err) } // Note: underlying output. @@ -192,7 +194,10 @@ func (p *Input) loadStates(states []file.State) error { // In case a input is tried to be started with an unfinished state matching the glob pattern if !state.Finished { - return &common.ErrInputNotFinished{State: state.String()} + return &common.ErrInputNotFinished{ + State: state.String(), + File: state.Fileinfo.Name(), + } } // Convert state to current identifier if different @@ -466,7 +471,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) { var absolutePath string absolutePath, err = filepath.Abs(path) if err != nil { - return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err) + return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %w", absolutePath, err) } p.logger.Debugf("Check file for harvesting: %s", absolutePath) // Create new state for comparison @@ -548,7 +553,7 @@ func (p *Input) scan() { if isNewState { logger.Debugf("Start harvester for new file: %s", newState.Source) err := p.startHarvester(logger, newState, 0) - if err == errHarvesterLimit { + if errors.Is(err, errHarvesterLimit) { logger.Debugf(harvesterErrMsg, newState.Source, err) continue } @@ -673,11 +678,7 @@ func (p *Input) isIgnoreOlder(state file.State) bool { } modTime := state.Fileinfo.ModTime() - if time.Since(modTime) > p.config.IgnoreOlder { - return true - } - - return false + return time.Since(modTime) > p.config.IgnoreOlder } // isCleanInactive checks if the given state false under clean_inactive @@ -688,11 +689,7 @@ func (p *Input) isCleanInactive(state file.State) bool { } modTime := state.Fileinfo.ModTime() - if time.Since(modTime) > p.config.CleanInactive { - return true - } - - return false + return time.Since(modTime) > p.config.CleanInactive } // subOutletWrap returns a factory method that will wrap the passed outlet @@ -748,7 +745,7 @@ func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int err = h.Setup() if err != nil { p.numHarvesters.Dec() - return fmt.Errorf("error setting up harvester: %s", err) + return fmt.Errorf("error setting up harvester: %w", err) } // Update state before staring harvester @@ -783,7 +780,7 @@ func (p *Input) updateState(state file.State) error { stateToRemove := file.State{Id: state.PrevId, TTL: 0, Finished: true, Meta: nil} err := p.doUpdate(stateToRemove) if err != nil { - return fmt.Errorf("failed to remove outdated states based on prev_id: %v", err) + return fmt.Errorf("failed to remove outdated states based on prev_id: %w", err) } } diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go index 68fecb8f5504..9f5248e815ed 100644 --- a/libbeat/common/errors.go +++ b/libbeat/common/errors.go @@ -24,6 +24,7 @@ import ( // ErrInputNotFinished struct for reporting errors related to not finished inputs type ErrInputNotFinished struct { State string + File string } // Error method of ErrInputNotFinished diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index 0474e41e61e9..bf8f0e7040f1 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -179,6 +179,7 @@ func IntegTest() { // GoIntegTest starts the docker containers and executes the Go integration tests. func GoIntegTest(ctx context.Context) error { + mg.Deps(BuildSystemTestBinary) return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs()) } diff --git a/x-pack/filebeat/tests/integration/framework_test.go b/x-pack/filebeat/tests/integration/framework_test.go new file mode 100644 index 000000000000..1a65eb96da8e --- /dev/null +++ b/x-pack/filebeat/tests/integration/framework_test.go @@ -0,0 +1,136 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bufio" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type BeatProc struct { + Binary string + Args []string + Cmd *exec.Cmd + t *testing.T + tempDir string +} + +// NewBeat createa a new Beat process from the system tests binary. +// It sets some required options like the home path, logging, etc. +// `tempDir` will be used as home and logs directory for the Beat +// `args` will be passed as CLI arguments to the Beat +func NewBeat(t *testing.T, binary string, tempDir string, args ...string) BeatProc { + p := BeatProc{ + t: t, + Binary: binary, + Args: append([]string{ + "--systemTest", + "--path.home", tempDir, + "--path.logs", tempDir, + "-E", "logging.to_files=true", + "-E", "logging.files.rotateeverybytes=104857600", // About 100MB + }, args...), + tempDir: tempDir, + } + return p +} + +// Start starts the Beat process +func (b *BeatProc) Start() { + t := b.t + fullPath, err := filepath.Abs(b.Binary) + if err != nil { + t.Fatalf("could not get full path from %q, err: %s", b.Binary, err) + } + b.Cmd = exec.Command(fullPath, b.Args...) + + if err := b.Cmd.Start(); err != nil { + t.Fatalf("could not start process: %s", err) + } + t.Cleanup(func() { + pid := b.Cmd.Process.Pid + if err := b.Cmd.Process.Kill(); err != nil { + t.Fatalf("could not stop process with PID: %d, err: %s", pid, err) + } + }) +} + +// LogContains looks for `s` as a substring of every log line, +// it will open the log file on every call, read it until EOF, +// then close it. +func (b *BeatProc) LogContains(s string) bool { + t := b.t + logFile := b.openLogFile() + defer func() { + if err := logFile.Close(); err != nil { + // That's not quite a test error, but it can impact + // next executions of LogContains, so treat it as an error + t.Errorf("could not close log file: %s", err) + } + }() + + r := bufio.NewReader(logFile) + for { + line, err := r.ReadString('\n') + if err != nil { + if err != io.EOF { + t.Fatalf("error reading log file '%s': %s", logFile.Name(), err) + } + break + } + if strings.Contains(line, s) { + return true + } + } + + return false +} + +// openLogFile opens the log file for reading and returns it. +// It also registers a cleanup function to close the file +// when the test ends. +func (b *BeatProc) openLogFile() *os.File { + t := b.t + glob := fmt.Sprintf("%s-*.ndjson", filepath.Join(b.tempDir, "filebeat")) + files, err := filepath.Glob(glob) + if err != nil { + t.Fatalf("could not expand log file glob: %s", err) + } + + require.Eventually(t, func() bool { + files, err = filepath.Glob(glob) + if err != nil { + t.Fatalf("could not expand log file glob: %s", err) + } + return len(files) == 1 + }, 5*time.Second, 100*time.Millisecond, + "waiting for log file matching glob '%s' to be created", glob) + + // On a normal operation there must be a single log, if there are more + // than one, then there is an issue and the Beat is logging too much, + // which is enough to stop the test + if len(files) != 1 { + t.Fatalf("there must be only one log file for %s, found: %d", + glob, len(files)) + } + + f, err := os.Open(files[0]) + if err != nil { + t.Fatalf("could not open log file '%s': %s", files[0], err) + } + + return f +} diff --git a/x-pack/filebeat/tests/integration/input_reload_test.go b/x-pack/filebeat/tests/integration/input_reload_test.go new file mode 100644 index 000000000000..cab3b273a9bf --- /dev/null +++ b/x-pack/filebeat/tests/integration/input_reload_test.go @@ -0,0 +1,364 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/structpb" + + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +// TestInputReloadUnderElasticAgent will start a Filebeat and cause the input +// reload issue described on https://github.com/elastic/beats/issues/33653. +// In short, a new input for a file needs to be started while there are still +// events from that file in the publishing pipeline, effectively keeping +// the harvester status as `finished: false`, which prevents the new input +// from starting. +// +// This tests ensures Filebeat can gracefully recover from this situation +// and will eventually re-start harvesting the file. +// +// In case of a test failure the directory with Filebeat logs and +// all other supporting files will be kept on build/integration-tests. +// +// Run the tests with -v flag to print the temporary folder used. +func TestInputReloadUnderElasticAgent(t *testing.T) { + // First things first, ensure ES is running and we can connect to it. + // If ES is not running, the test will timeout and the only way to know + // what caused it is going through Filebeat's logs. + ensureESIsRunning(t) + + // We create our own temp dir so the files can be persisted + // in case the test fails. This will help debugging issues + // locally and on CI. + // + // testSucceeded will be set to 'true' as the very last thing on this test, + // it allows us to use t.CleanUp to remove the temporary files + testSucceeded := false + tempDir, err := filepath.Abs(filepath.Join("../../build/integration-tests/", + fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()))) + if err != nil { + t.Fatal(err) + } + + if err := os.MkdirAll(tempDir, 0766); err != nil { + t.Fatalf("cannot create tmp dir: %s, msg: %s", err, err.Error()) + } + t.Logf("Temporary directory: %s", tempDir) + t.Cleanup(func() { + if testSucceeded { + if err := os.RemoveAll(tempDir); err != nil { + t.Fatalf("could not remove temp dir '%s': %s", tempDir, err) + } + t.Logf("Temporary directory '%s' removed", tempDir) + } + }) + + logFilePath := filepath.Join(tempDir, "flog.log") + generateLogFile(t, logFilePath) + var units = [][]*proto.UnitExpected{ + { + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: requireNewStruct(t, + map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"http://localhost:9200"}, + "username": "admin", + "password": "testing", + "protocol": "http", + "enabled": true, + }), + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "log-input", + Type: "log", + Name: "log", + Streams: []*proto.Stream{ + { + Id: "log-input-1", + Source: requireNewStruct(t, map[string]interface{}{ + "enabled": true, + "type": "log", + "paths": []interface{}{logFilePath}, + }), + }, + }, + }, + }, + }, + { + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: requireNewStruct(t, + map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"http://localhost:9200"}, + "username": "admin", + "password": "testing", + "protocol": "http", + "enabled": true, + }), + }, + }, + { + Id: "input-unit-2", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "log-input", + Type: "log", + Name: "log", + Streams: []*proto.Stream{ + { + Id: "log-input-2", + Source: requireNewStruct(t, map[string]interface{}{ + "enabled": true, + "type": "log", + "paths": []interface{}{logFilePath}, + }), + }, + }, + }, + }, + }, + } + + // Once the desired state is reached (aka Filebeat finished applying + // the policy changes) we still wait for a little bit before sending + // another policy. This will allow the input to run and get some data + // into the publishing pipeline. + // + // nextState is a helper function that will keep cycling through both + // elements of the `units` slice. Once one is fully applied, we wait + // at least 10s then send the next one. + idx := 0 + waiting := false + when := time.Now() + nextState := func() { + if waiting { + if time.Now().After(when) { + idx = (idx + 1) % len(units) + waiting = false + return + } + return + } + waiting = true + when = time.Now().Add(10 * time.Second) + } + server := &mock.StubServerV2{ + // The Beat will call the check-in function multiple times: + // - At least once at startup + // - At every state change (starting, configuring, healthy, etc) + // for every Unit. + // + // Because of that we can't rely on the number of times it is called + // we need some sort of state machine to handle when to send the next + // policy and when to just re-send the current one. + // + // If the Elastic-Agent wants the Beat to keep running the same policy, + // it will just keep re-sending it every time the Beat calls the check-in + // method. + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + if management.DoesStateMatch(observed, units[idx], 0) { + nextState() + } + for _, unit := range observed.GetUnits() { + if state := unit.GetState(); !(state == proto.State_HEALTHY || state != proto.State_CONFIGURING || state == proto.State_STARTING) { + t.Fatalf("Unit '%s' is not healthy, state: %s", unit.GetId(), unit.GetState().String()) + } + } + return &proto.CheckinExpected{ + Units: units[idx], + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + + require.NoError(t, server.Start()) + t.Cleanup(server.Stop) + + filebeat := NewBeat( + t, + "../../filebeat.test", + tempDir, + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + ) + + filebeat.Start() + + // waitDeadlineOr5Mins looks at the test deadline + // and returns a reasonable value of waiting for a + // condition to be met. The possible values are: + // - if no test deadline is set, return 5 minutes + // - if a deadline is set and there is less than + // 0.5 second left, return the time left + // - otherwise return the time left minus 0.5 second. + waitDeadlineOr5Min := func() time.Duration { + deadline, deadileSet := t.Deadline() + if deadileSet { + left := deadline.Sub(time.Now()) + final := left - 500*time.Millisecond + if final <= 0 { + return left + } + return final + } + return 5 * time.Minute + } + + require.Eventually(t, func() bool { + return filebeat.LogContains("Can only start an input when all related states are finished") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'Can only start an input when all related states are finished' not found on Filebeat logs") + + require.Eventually(t, func() bool { + return filebeat.LogContains("file 'flog.log' is not finished, will retry starting the input soon") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'file 'flog.log' is not finished, will retry starting the input soon' not found on Filebeat logs") + + require.Eventually(t, func() bool { + return filebeat.LogContains("ForceReload set to TRUE") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'ForceReload set to TRUE' not found on Filebeat logs") + + require.Eventually(t, func() bool { + return filebeat.LogContains("Reloading Beats inputs because forceReload is true") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'Reloading Beats inputs because forceReload is true' not found on Filebeat logs") + + require.Eventually(t, func() bool { + return filebeat.LogContains("ForceReload set to FALSE") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'ForceReload set to FALSE' not found on Filebeat logs") + + // Set it to true, so the temporary directory is removed + testSucceeded = true +} + +func requireNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct { + str, err := structpb.NewStruct(v) + if err != nil { + require.NoError(t, err) + } + return str +} + +// generateLogFile generates a log file by appending the current +// time to it every second. +func generateLogFile(t *testing.T, fullPath string) { + t.Helper() + f, err := os.Create(fullPath) + if err != nil { + t.Fatalf("could not create file '%s: %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(time.Second) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + for { + select { + case <-done: + return + case now := <-ticker.C: + _, err := fmt.Fprintln(f, now.Format(time.RFC3339)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, so just log it instead + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + } + } + }() +} + +func ensureESIsRunning(t *testing.T) { + t.Helper() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second)) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:9200", nil) + if err != nil { + t.Fatalf("cannot create request to ensure ES is running: %s", err) + } + + user := os.Getenv("ES_USER") + if user == "" { + user = "admin" + } + + pass := os.Getenv("ES_PASS") + if pass == "" { + pass = "testing" + } + + req.SetBasicAuth(user, pass) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + // If you're reading this message, you probably forgot to start ES + // run `mage compose:Up` from Filebeat's folder to start all + // containers required for integration tests + t.Fatalf("cannot execute HTTP request to ES: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected HTTP status: %d, expecting 200 - OK", resp.StatusCode) + } +} diff --git a/x-pack/libbeat/management/config.go b/x-pack/libbeat/management/config.go index ca5bc72b79a4..82986031465f 100644 --- a/x-pack/libbeat/management/config.go +++ b/x-pack/libbeat/management/config.go @@ -5,15 +5,18 @@ package management import ( + "errors" + "github.com/elastic/beats/v7/libbeat/common/reload" conf "github.com/elastic/elastic-agent-libs/config" ) // Config for central management type Config struct { - Enabled bool `config:"enabled" yaml:"enabled"` - Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"` - RestartOnOutputChange bool `config:"restart_on_output_change" yaml:"restart_on_output_change"` + Enabled bool `config:"enabled" yaml:"enabled"` + Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"` + RestartOnOutputChange bool `config:"restart_on_output_change" yaml:"restart_on_output_change"` + InsecureGRPCURLForTesting string `config:"insecure_grpc_url_for_testing" yaml:"insecure_grpc_url_for_testing"` } // ConfigBlock stores a piece of config from central management @@ -56,3 +59,10 @@ func (c *ConfigBlock) ConfigWithMeta() (*reload.ConfigWithMeta, error) { Config: config, }, nil } + +func (c Config) Validate() error { + if !c.Enabled && c.InsecureGRPCURLForTesting != "" { + return errors.New("'management.insecure_grpc_url_for_testing' can only be used if 'management.enabled' is set.") + } + return nil +} diff --git a/x-pack/libbeat/management/input_reload_test.go b/x-pack/libbeat/management/input_reload_test.go new file mode 100644 index 000000000000..972c42908f11 --- /dev/null +++ b/x-pack/libbeat/management/input_reload_test.go @@ -0,0 +1,207 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/joeshaw/multierror" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func TestInputReload(t *testing.T) { + // Uncomment the line below to see the debug logs for this test + // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*", "centralmgmt.V2-manager")) + r := reload.NewRegistry() + + output := &reloadable{} + r.MustRegisterOutput(output) + + reloadCallCount := 0 + inputs := &reloadableListMock{ + ReloadImpl: func(configs []*reload.ConfigWithMeta) error { + reloadCallCount++ + if reloadCallCount == 1 { + e1 := multierror.Errors{fmt.Errorf("%w", &common.ErrInputNotFinished{ + State: "", + File: "/tmp/foo.log", + })} + return e1.Err() + } + + return nil + }, + } + r.MustRegisterInput(inputs) + + configIdx := -1 + onObserved := func(observed *proto.CheckinObserved, currentIdx int) { + configIdx = currentIdx + } + + srv := mockSrv([][]*proto.UnitExpected{ + { + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "log-input", + Type: "log", + Name: "log", + Streams: []*proto.Stream{ + { + Id: "log-input-1", + Source: requireNewStruct(t, map[string]interface{}{ + "paths": []interface{}{"/tmp/foo.log"}, + }), + }, + }, + }, + }, + }, + { + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "log-input-2", + Type: "log", + Name: "log", + Streams: []*proto.Stream{ + { + Id: "log-input-2", + Source: requireNewStruct(t, map[string]interface{}{ + "paths": []interface{}{"/tmp/foo.log"}, + }), + }, + }, + }, + }, + }, + }, + []uint64{1, 1}, + []*proto.Features{ + nil, + nil, + }, + onObserved, + 500*time.Millisecond, + ) + require.NoError(t, srv.Start()) + defer srv.Stop() + + client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ + Name: "program", + Version: "v1.0.0", + Meta: map[string]string{ + "key": "value", + }, + }, grpc.WithTransportCredentials(insecure.NewCredentials())) + + m, err := NewV2AgentManagerWithClient( + &Config{ + Enabled: true, + }, + r, + client, + WithChangeDebounce(300*time.Millisecond), + WithForceReloadDebounce(800*time.Millisecond), + ) + require.NoError(t, err) + + mm := m.(*BeatV2Manager) + + err = m.Start() + require.NoError(t, err) + defer m.Stop() + + forceReloadStates := []bool{false, true, false} + forceReloadStateIdx := 0 + forceReloadLastState := true // starts on true so the first iteration is already a change + + eventuallyCheck := func() bool { + forceReload := mm.forceReload + // That detects a state change, we only count/advance steps + // on state changes + if forceReload != forceReloadLastState { + forceReloadLastState = forceReload + if forceReload == forceReloadStates[forceReloadStateIdx] { + // Set to the next state + forceReloadStateIdx++ + } + + // If we went through all states, then succeed + if forceReloadStateIdx == len(forceReloadStates) { + // If we went through all states + if configIdx == 1 { + return true + } + } + } + + return false + } + + require.Eventually(t, eventuallyCheck, 20*time.Second, 300*time.Millisecond, + "the expected changes on forceReload did not happen") +} + +type reloadableListMock struct { + mx sync.Mutex + configs []*reload.ConfigWithMeta + ReloadImpl func(configs []*reload.ConfigWithMeta) error +} + +func (r *reloadableListMock) Reload(configs []*reload.ConfigWithMeta) error { + r.mx.Lock() + defer r.mx.Unlock() + return r.ReloadImpl(configs) +} + +func (r *reloadableListMock) Configs() []*reload.ConfigWithMeta { + r.mx.Lock() + defer r.mx.Unlock() + return r.configs +} diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 922170177d71..70db3fd88b08 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -6,6 +6,7 @@ package management import ( "context" + "errors" "fmt" "os" "os/signal" @@ -15,9 +16,12 @@ import ( "github.com/joeshaw/multierror" "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" gproto "google.golang.org/protobuf/proto" "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/features" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -49,9 +53,10 @@ type BeatV2Manager struct { errCanceller context.CancelFunc // track individual units given to us by the V2 API - mx sync.Mutex - units map[unitKey]*client.Unit - actions []client.Action + mx sync.Mutex + units map[unitKey]*client.Unit + actions []client.Action + forceReload bool // status is reported as a whole for every unit sent to this component // hopefully this can be improved in the future to be separated per unit @@ -84,6 +89,13 @@ type BeatV2Manager struct { lastBeatOutputCfg *reload.ConfigWithMeta lastBeatInputCfgs []*reload.ConfigWithMeta lastBeatFeaturesCfg *conf.C + + // changeDebounce is the debounce time for a configuration change + changeDebounce time.Duration + // forceReloadDebounce is the time the manager will wait before + // trying to reload the configuration after an input not finished error + // happens + forceReloadDebounce time.Duration } // ================================ @@ -95,6 +107,20 @@ func WithStopOnEmptyUnits(m *BeatV2Manager) { m.stopOnEmptyUnits = true } +// WithChangeDebounce sets the changeDeboung value +func WithChangeDebounce(d time.Duration) func(b *BeatV2Manager) { + return func(b *BeatV2Manager) { + b.changeDebounce = d + } +} + +// WithForceReloadDebounce sets the forceReloadDebounce value +func WithForceReloadDebounce(d time.Duration) func(b *BeatV2Manager) { + return func(b *BeatV2Manager) { + b.forceReloadDebounce = d + } +} + // ================================ // Init Functions // ================================ @@ -109,22 +135,38 @@ func init() { // This is registered as the manager factory in init() so that calls to // lbmanagement.NewManager will be forwarded here. func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { + logger := logp.NewLogger(lbmanagement.DebugK).Named("V2-manager") c := DefaultConfig() if config.Enabled() { if err := config.Unpack(&c); err != nil { return nil, fmt.Errorf("parsing fleet management settings: %w", err) } } - agentClient, _, err := client.NewV2FromReader(os.Stdin, client.VersionInfo{ - Name: "beat-v2-client", - Version: version.GetDefaultVersion(), - Meta: map[string]string{ - "commit": version.Commit(), - "build_time": version.BuildTime().String(), - }, - }) - if err != nil { - return nil, fmt.Errorf("error reading control config from agent: %w", err) + + var agentClient client.V2 + var err error + if c.InsecureGRPCURLForTesting != "" && c.Enabled { + // Insecure for testing Elastic-Agent-Client initialisation + logger.Info("Using INSECURE GRPC connection, this should be only used for testing!") + agentClient = client.NewV2(c.InsecureGRPCURLForTesting, + "", // Insecure connection for test, no token needed + client.VersionInfo{ + Name: "beat-v2-client-for-testing", + Version: version.GetDefaultVersion(), + }, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + // Normal Elastic-Agent-Client initialisation + agentClient, _, err = client.NewV2FromReader(os.Stdin, client.VersionInfo{ + Name: "beat-v2-client", + Version: version.GetDefaultVersion(), + Meta: map[string]string{ + "commit": version.Commit(), + "build_time": version.BuildTime().String(), + }, + }) + if err != nil { + return nil, fmt.Errorf("error reading control config from agent: %w", err) + } } // officially running under the elastic-agent; we set the publisher pipeline @@ -150,6 +192,12 @@ func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agen status: lbmanagement.Running, message: "Healthy", stopChan: make(chan struct{}, 1), + changeDebounce: time.Second, + // forceReloadDebounce is greater than changeDebounce because it is only + // used when an input has not reached its finished state, this means some events + // still need to be acked by the acker, hence the longer we wait the more likely + // for the input to have reached its finished state. + forceReloadDebounce: time.Second * 10, } if config.Enabled { @@ -378,15 +426,13 @@ func (cm *BeatV2Manager) watchErrChan(ctx context.Context) { } func (cm *BeatV2Manager) unitListen() { - const changeDebounce = 100 * time.Millisecond - // register signal handler sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) // timer is used to provide debounce on unit changes // this allows multiple changes to come in and only a single reload be performed - t := time.NewTimer(changeDebounce) + t := time.NewTimer(cm.changeDebounce) t.Stop() // starts stopped, until a change occurs cm.logger.Debug("Listening for agent unit changes") @@ -411,7 +457,8 @@ func (cm *BeatV2Manager) unitListen() { return case change := <-cm.client.UnitChanges(): cm.logger.Infof( - "BeatV2Manager.unitListen UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s", + "BeatV2Manager.unitListen UnitChanged.ID(%s), UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s", + change.Unit.ID(), change.Type, int64(change.Triggers), change.Type, change.Triggers) switch change.Type { @@ -420,11 +467,11 @@ func (cm *BeatV2Manager) unitListen() { case client.UnitChangedAdded: cm.addUnit(change.Unit) // reset can be called here because `<-t.C` is handled in the same select - t.Reset(changeDebounce) + t.Reset(cm.changeDebounce) case client.UnitChangedModified: cm.modifyUnit(change.Unit) // reset can be called here because `<-t.C` is handled in the same select - t.Reset(changeDebounce) + t.Reset(cm.changeDebounce) case client.UnitChangedRemoved: cm.deleteUnit(change.Unit) } @@ -439,6 +486,10 @@ func (cm *BeatV2Manager) unitListen() { } cm.mx.Unlock() cm.reload(units) + if cm.forceReload { + // Restart the debounce timer so we try to reload the inputs. + t.Reset(cm.forceReloadDebounce) + } } } } @@ -642,15 +693,62 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { inputBeatCfgs = append(inputBeatCfgs, inputCfg...) } - if !didChange(cm.lastInputCfgs, inputCfgs) { + if !didChange(cm.lastInputCfgs, inputCfgs) && !cm.forceReload { cm.logger.Debug("Skipped reloading input units; configuration didn't change") return nil } - err := obj.Reload(inputBeatCfgs) - if err != nil { - return fmt.Errorf("failed to reloading inputs: %w", err) + if cm.forceReload { + cm.logger.Info("Reloading Beats inputs because forceReload is true. " + + "Set log level to debug to get more information about which " + + "inputs are causing this.") + } + + if err := obj.Reload(inputBeatCfgs); err != nil { + merror := &multierror.MultiError{} + realErrors := multierror.Errors{} + + // At the moment this logic is tightly bound to the current RunnerList + // implementation from libbeat/cfgfile/list.go and Input.loadStates from + // filebeat/input/log/input.go. + // If they change the way they report errors, this will break. + // TODO (Tiago): update all layers to use the most recent features from + // the standard library errors package. + if errors.As(err, &merror) { + for _, err := range merror.Errors { + causeErr := errors.Unwrap(err) + // A Log input is only marked as finished when all events it + // produced are acked by the acker so when we see this error, + // we just retry until the new input can be started. + // This is the same logic used by the standalone configuration file + // reloader implemented on libbeat/cfgfile/reload.go + inputNotFinishedErr := &common.ErrInputNotFinished{} + if ok := errors.As(causeErr, &inputNotFinishedErr); ok { + cm.logger.Debugf("file '%s' is not finished, will retry starting the input soon", inputNotFinishedErr.File) + cm.forceReload = true + cm.logger.Debug("ForceReload set to TRUE") + continue + } + + // This is an error that cannot be ignored, so we report it + realErrors = append(realErrors, err) + } + } + + if len(realErrors) != 0 { + return fmt.Errorf("failed to reload inputs: %w", realErrors.Err()) + } + } else { + // If there was no error reloading input and forceReload was + // true, then set it to false. This prevents unnecessary logging + // and makes it clear this was the moment when the input reload + // finally worked. + if cm.forceReload { + cm.forceReload = false + cm.logger.Debug("ForceReload set to FALSE") + } } + cm.lastInputCfgs = inputCfgs cm.lastBeatInputCfgs = inputBeatCfgs return nil diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 952263a43fbc..538dd0c58023 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -25,7 +25,6 @@ import ( ) func TestManagerV2(t *testing.T) { - r := reload.NewRegistry() output := &reloadable{} @@ -239,7 +238,7 @@ func mockSrv( if observedCallback != nil { observedCallback(observed, i) } - matches := doesStateMatch(observed, units[i], featuresIdxs[i]) + matches := DoesStateMatch(observed, units[i], featuresIdxs[i]) if !matches { // send same set of units and features return &proto.CheckinExpected{ @@ -274,35 +273,6 @@ func mockSrv( } } -func doesStateMatch( - observed *proto.CheckinObserved, - expectedUnits []*proto.UnitExpected, - expectedFeaturesIdx uint64, -) bool { - if len(observed.Units) != len(expectedUnits) { - return false - } - expectedMap := make(map[unitKey]*proto.UnitExpected) - for _, exp := range expectedUnits { - expectedMap[unitKey{client.UnitType(exp.Type), exp.Id}] = exp - } - for _, unit := range observed.Units { - exp, ok := expectedMap[unitKey{client.UnitType(unit.Type), unit.Id}] - if !ok { - return false - } - if unit.State != exp.State || unit.ConfigStateIdx != exp.ConfigStateIdx { - return false - } - } - - if observed.FeaturesIdx != expectedFeaturesIdx { - return false - } - - return true -} - type reloadable struct { mx sync.Mutex config *reload.ConfigWithMeta diff --git a/x-pack/libbeat/management/test_utils.go b/x-pack/libbeat/management/test_utils.go new file mode 100644 index 000000000000..fba637154b5c --- /dev/null +++ b/x-pack/libbeat/management/test_utils.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +// DoesStateMatch returns true if the observed state matches +// the expectedUnits and Features. +func DoesStateMatch( + observed *proto.CheckinObserved, + expectedUnits []*proto.UnitExpected, + expectedFeaturesIdx uint64, +) bool { + if len(observed.Units) != len(expectedUnits) { + return false + } + expectedMap := make(map[unitKey]*proto.UnitExpected) + for _, exp := range expectedUnits { + expectedMap[unitKey{client.UnitType(exp.Type), exp.Id}] = exp + } + for _, unit := range observed.Units { + exp, ok := expectedMap[unitKey{client.UnitType(unit.Type), unit.Id}] + if !ok { + return false + } + if unit.State != exp.State || unit.ConfigStateIdx != exp.ConfigStateIdx { + return false + } + } + + return observed.FeaturesIdx == expectedFeaturesIdx +}