diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 115ea2048d4..361417994e0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Sanitize filenames for request tracer in cel input. {pull}35154[35154] - Fix accidental error overwrite in defer statement in entityanalytics Azure AD input. {issue}35153[35153] {pull}35169[35169] - Fixing the grok expression outputs of log files {pull}35221[35221] +- Fixes "Can only start an input when all related states are finished" error when running under Elastic-Agent {pull}35250[35250] {issue}33653[33653] - Move repeated Windows event channel not found errors in winlog input to debug level. {issue}35314[35314] {pull}35317[35317] - Fix crash when processing forwarded logs missing a message. {issue}34705[34705] {pull}34865[34865] - Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433] diff --git a/Jenkinsfile b/Jenkinsfile index 3d8f004297f..1da726b38cd 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -707,7 +707,7 @@ def withBeatsEnv(Map args = [:], Closure body) { error("Error '${err.toString()}'") } finally { if (archive) { - archiveArtifacts(allowEmptyArchive: true, artifacts: "${directory}/build/system-tests/docker-logs/TEST-docker-compose-*.log") + archiveArtifacts(allowEmptyArchive: true, artifacts: "${directory}/build/system-tests/docker-logs/TEST-docker-compose-*.log, ${directory}/build/integration-tests/**/**") archiveTestOutput(directory: directory, testResults: testResults, artifacts: artifacts, id: args.id, upload: upload) } tearDown() diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index eeaa2de0c91..4882735d36b 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -93,7 +93,9 @@ func NewInput( cleanupNeeded := true cleanupIfNeeded := func(f func() error) { if cleanupNeeded { - f() + if err := f(); err != nil { + logp.L().Named("input.log").Errorf("clean up function returned an error: %w", err) + } } } @@ -103,10 +105,10 @@ func NewInput( return nil, err } if err := inputConfig.resolveRecursiveGlobs(); err != nil { - return nil, fmt.Errorf("Failed to resolve recursive globs in config: %v", err) + return nil, fmt.Errorf("Failed to resolve recursive globs in config: %w", err) } if err := inputConfig.normalizeGlobPatterns(); err != nil { - return nil, fmt.Errorf("Failed to normalize globs patterns: %v", err) + return nil, fmt.Errorf("Failed to normalize globs patterns: %w", err) } if len(inputConfig.Paths) == 0 { @@ -115,7 +117,7 @@ func NewInput( identifier, err := file.NewStateIdentifier(inputConfig.FileIdentity) if err != nil { - return nil, fmt.Errorf("failed to initialize file identity generator: %+v", err) + return nil, fmt.Errorf("failed to initialize file identity generator: %w", err) } // Note: underlying output. @@ -192,7 +194,10 @@ func (p *Input) loadStates(states []file.State) error { // In case a input is tried to be started with an unfinished state matching the glob pattern if !state.Finished { - return &common.ErrInputNotFinished{State: state.String()} + return &common.ErrInputNotFinished{ + State: state.String(), + File: state.Fileinfo.Name(), + } } // Convert state to current identifier if different @@ -466,7 +471,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) { var absolutePath string absolutePath, err = filepath.Abs(path) if err != nil { - return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err) + return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %w", absolutePath, err) } p.logger.Debugf("Check file for harvesting: %s", absolutePath) // Create new state for comparison @@ -548,7 +553,7 @@ func (p *Input) scan() { if isNewState { logger.Debugf("Start harvester for new file: %s", newState.Source) err := p.startHarvester(logger, newState, 0) - if err == errHarvesterLimit { + if errors.Is(err, errHarvesterLimit) { logger.Debugf(harvesterErrMsg, newState.Source, err) continue } @@ -673,11 +678,7 @@ func (p *Input) isIgnoreOlder(state file.State) bool { } modTime := state.Fileinfo.ModTime() - if time.Since(modTime) > p.config.IgnoreOlder { - return true - } - - return false + return time.Since(modTime) > p.config.IgnoreOlder } // isCleanInactive checks if the given state false under clean_inactive @@ -688,11 +689,7 @@ func (p *Input) isCleanInactive(state file.State) bool { } modTime := state.Fileinfo.ModTime() - if time.Since(modTime) > p.config.CleanInactive { - return true - } - - return false + return time.Since(modTime) > p.config.CleanInactive } // subOutletWrap returns a factory method that will wrap the passed outlet @@ -748,7 +745,7 @@ func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int err = h.Setup() if err != nil { p.numHarvesters.Dec() - return fmt.Errorf("error setting up harvester: %s", err) + return fmt.Errorf("error setting up harvester: %w", err) } // Update state before staring harvester @@ -783,7 +780,7 @@ func (p *Input) updateState(state file.State) error { stateToRemove := file.State{Id: state.PrevId, TTL: 0, Finished: true, Meta: nil} err := p.doUpdate(stateToRemove) if err != nil { - return fmt.Errorf("failed to remove outdated states based on prev_id: %v", err) + return fmt.Errorf("failed to remove outdated states based on prev_id: %w", err) } } diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go index 68fecb8f550..9f5248e815e 100644 --- a/libbeat/common/errors.go +++ b/libbeat/common/errors.go @@ -24,6 +24,7 @@ import ( // ErrInputNotFinished struct for reporting errors related to not finished inputs type ErrInputNotFinished struct { State string + File string } // Error method of ErrInputNotFinished diff --git a/x-pack/filebeat/magefile.go b/x-pack/filebeat/magefile.go index 0474e41e61e..bf8f0e7040f 100644 --- a/x-pack/filebeat/magefile.go +++ b/x-pack/filebeat/magefile.go @@ -179,6 +179,7 @@ func IntegTest() { // GoIntegTest starts the docker containers and executes the Go integration tests. func GoIntegTest(ctx context.Context) error { + mg.Deps(BuildSystemTestBinary) return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs()) } diff --git a/x-pack/filebeat/tests/integration/framework_test.go b/x-pack/filebeat/tests/integration/framework_test.go new file mode 100644 index 00000000000..1a65eb96da8 --- /dev/null +++ b/x-pack/filebeat/tests/integration/framework_test.go @@ -0,0 +1,136 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bufio" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +type BeatProc struct { + Binary string + Args []string + Cmd *exec.Cmd + t *testing.T + tempDir string +} + +// NewBeat createa a new Beat process from the system tests binary. +// It sets some required options like the home path, logging, etc. +// `tempDir` will be used as home and logs directory for the Beat +// `args` will be passed as CLI arguments to the Beat +func NewBeat(t *testing.T, binary string, tempDir string, args ...string) BeatProc { + p := BeatProc{ + t: t, + Binary: binary, + Args: append([]string{ + "--systemTest", + "--path.home", tempDir, + "--path.logs", tempDir, + "-E", "logging.to_files=true", + "-E", "logging.files.rotateeverybytes=104857600", // About 100MB + }, args...), + tempDir: tempDir, + } + return p +} + +// Start starts the Beat process +func (b *BeatProc) Start() { + t := b.t + fullPath, err := filepath.Abs(b.Binary) + if err != nil { + t.Fatalf("could not get full path from %q, err: %s", b.Binary, err) + } + b.Cmd = exec.Command(fullPath, b.Args...) + + if err := b.Cmd.Start(); err != nil { + t.Fatalf("could not start process: %s", err) + } + t.Cleanup(func() { + pid := b.Cmd.Process.Pid + if err := b.Cmd.Process.Kill(); err != nil { + t.Fatalf("could not stop process with PID: %d, err: %s", pid, err) + } + }) +} + +// LogContains looks for `s` as a substring of every log line, +// it will open the log file on every call, read it until EOF, +// then close it. +func (b *BeatProc) LogContains(s string) bool { + t := b.t + logFile := b.openLogFile() + defer func() { + if err := logFile.Close(); err != nil { + // That's not quite a test error, but it can impact + // next executions of LogContains, so treat it as an error + t.Errorf("could not close log file: %s", err) + } + }() + + r := bufio.NewReader(logFile) + for { + line, err := r.ReadString('\n') + if err != nil { + if err != io.EOF { + t.Fatalf("error reading log file '%s': %s", logFile.Name(), err) + } + break + } + if strings.Contains(line, s) { + return true + } + } + + return false +} + +// openLogFile opens the log file for reading and returns it. +// It also registers a cleanup function to close the file +// when the test ends. +func (b *BeatProc) openLogFile() *os.File { + t := b.t + glob := fmt.Sprintf("%s-*.ndjson", filepath.Join(b.tempDir, "filebeat")) + files, err := filepath.Glob(glob) + if err != nil { + t.Fatalf("could not expand log file glob: %s", err) + } + + require.Eventually(t, func() bool { + files, err = filepath.Glob(glob) + if err != nil { + t.Fatalf("could not expand log file glob: %s", err) + } + return len(files) == 1 + }, 5*time.Second, 100*time.Millisecond, + "waiting for log file matching glob '%s' to be created", glob) + + // On a normal operation there must be a single log, if there are more + // than one, then there is an issue and the Beat is logging too much, + // which is enough to stop the test + if len(files) != 1 { + t.Fatalf("there must be only one log file for %s, found: %d", + glob, len(files)) + } + + f, err := os.Open(files[0]) + if err != nil { + t.Fatalf("could not open log file '%s': %s", files[0], err) + } + + return f +} diff --git a/x-pack/filebeat/tests/integration/input_reload_test.go b/x-pack/filebeat/tests/integration/input_reload_test.go new file mode 100644 index 00000000000..cab3b273a9b --- /dev/null +++ b/x-pack/filebeat/tests/integration/input_reload_test.go @@ -0,0 +1,364 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/structpb" + + "github.com/elastic/beats/v7/x-pack/libbeat/management" + "github.com/elastic/elastic-agent-client/v7/pkg/client/mock" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +// TestInputReloadUnderElasticAgent will start a Filebeat and cause the input +// reload issue described on https://github.com/elastic/beats/issues/33653. +// In short, a new input for a file needs to be started while there are still +// events from that file in the publishing pipeline, effectively keeping +// the harvester status as `finished: false`, which prevents the new input +// from starting. +// +// This tests ensures Filebeat can gracefully recover from this situation +// and will eventually re-start harvesting the file. +// +// In case of a test failure the directory with Filebeat logs and +// all other supporting files will be kept on build/integration-tests. +// +// Run the tests with -v flag to print the temporary folder used. +func TestInputReloadUnderElasticAgent(t *testing.T) { + // First things first, ensure ES is running and we can connect to it. + // If ES is not running, the test will timeout and the only way to know + // what caused it is going through Filebeat's logs. + ensureESIsRunning(t) + + // We create our own temp dir so the files can be persisted + // in case the test fails. This will help debugging issues + // locally and on CI. + // + // testSucceeded will be set to 'true' as the very last thing on this test, + // it allows us to use t.CleanUp to remove the temporary files + testSucceeded := false + tempDir, err := filepath.Abs(filepath.Join("../../build/integration-tests/", + fmt.Sprintf("%s-%d", t.Name(), time.Now().Unix()))) + if err != nil { + t.Fatal(err) + } + + if err := os.MkdirAll(tempDir, 0766); err != nil { + t.Fatalf("cannot create tmp dir: %s, msg: %s", err, err.Error()) + } + t.Logf("Temporary directory: %s", tempDir) + t.Cleanup(func() { + if testSucceeded { + if err := os.RemoveAll(tempDir); err != nil { + t.Fatalf("could not remove temp dir '%s': %s", tempDir, err) + } + t.Logf("Temporary directory '%s' removed", tempDir) + } + }) + + logFilePath := filepath.Join(tempDir, "flog.log") + generateLogFile(t, logFilePath) + var units = [][]*proto.UnitExpected{ + { + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: requireNewStruct(t, + map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"http://localhost:9200"}, + "username": "admin", + "password": "testing", + "protocol": "http", + "enabled": true, + }), + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "log-input", + Type: "log", + Name: "log", + Streams: []*proto.Stream{ + { + Id: "log-input-1", + Source: requireNewStruct(t, map[string]interface{}{ + "enabled": true, + "type": "log", + "paths": []interface{}{logFilePath}, + }), + }, + }, + }, + }, + }, + { + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + Source: requireNewStruct(t, + map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"http://localhost:9200"}, + "username": "admin", + "password": "testing", + "protocol": "http", + "enabled": true, + }), + }, + }, + { + Id: "input-unit-2", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "log-input", + Type: "log", + Name: "log", + Streams: []*proto.Stream{ + { + Id: "log-input-2", + Source: requireNewStruct(t, map[string]interface{}{ + "enabled": true, + "type": "log", + "paths": []interface{}{logFilePath}, + }), + }, + }, + }, + }, + }, + } + + // Once the desired state is reached (aka Filebeat finished applying + // the policy changes) we still wait for a little bit before sending + // another policy. This will allow the input to run and get some data + // into the publishing pipeline. + // + // nextState is a helper function that will keep cycling through both + // elements of the `units` slice. Once one is fully applied, we wait + // at least 10s then send the next one. + idx := 0 + waiting := false + when := time.Now() + nextState := func() { + if waiting { + if time.Now().After(when) { + idx = (idx + 1) % len(units) + waiting = false + return + } + return + } + waiting = true + when = time.Now().Add(10 * time.Second) + } + server := &mock.StubServerV2{ + // The Beat will call the check-in function multiple times: + // - At least once at startup + // - At every state change (starting, configuring, healthy, etc) + // for every Unit. + // + // Because of that we can't rely on the number of times it is called + // we need some sort of state machine to handle when to send the next + // policy and when to just re-send the current one. + // + // If the Elastic-Agent wants the Beat to keep running the same policy, + // it will just keep re-sending it every time the Beat calls the check-in + // method. + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + if management.DoesStateMatch(observed, units[idx], 0) { + nextState() + } + for _, unit := range observed.GetUnits() { + if state := unit.GetState(); !(state == proto.State_HEALTHY || state != proto.State_CONFIGURING || state == proto.State_STARTING) { + t.Fatalf("Unit '%s' is not healthy, state: %s", unit.GetId(), unit.GetState().String()) + } + } + return &proto.CheckinExpected{ + Units: units[idx], + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + + require.NoError(t, server.Start()) + t.Cleanup(server.Stop) + + filebeat := NewBeat( + t, + "../../filebeat.test", + tempDir, + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + ) + + filebeat.Start() + + // waitDeadlineOr5Mins looks at the test deadline + // and returns a reasonable value of waiting for a + // condition to be met. The possible values are: + // - if no test deadline is set, return 5 minutes + // - if a deadline is set and there is less than + // 0.5 second left, return the time left + // - otherwise return the time left minus 0.5 second. + waitDeadlineOr5Min := func() time.Duration { + deadline, deadileSet := t.Deadline() + if deadileSet { + left := deadline.Sub(time.Now()) + final := left - 500*time.Millisecond + if final <= 0 { + return left + } + return final + } + return 5 * time.Minute + } + + require.Eventually(t, func() bool { + return filebeat.LogContains("Can only start an input when all related states are finished") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'Can only start an input when all related states are finished' not found on Filebeat logs") + + require.Eventually(t, func() bool { + return filebeat.LogContains("file 'flog.log' is not finished, will retry starting the input soon") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'file 'flog.log' is not finished, will retry starting the input soon' not found on Filebeat logs") + + require.Eventually(t, func() bool { + return filebeat.LogContains("ForceReload set to TRUE") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'ForceReload set to TRUE' not found on Filebeat logs") + + require.Eventually(t, func() bool { + return filebeat.LogContains("Reloading Beats inputs because forceReload is true") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'Reloading Beats inputs because forceReload is true' not found on Filebeat logs") + + require.Eventually(t, func() bool { + return filebeat.LogContains("ForceReload set to FALSE") + }, waitDeadlineOr5Min(), 100*time.Millisecond, + "String 'ForceReload set to FALSE' not found on Filebeat logs") + + // Set it to true, so the temporary directory is removed + testSucceeded = true +} + +func requireNewStruct(t *testing.T, v map[string]interface{}) *structpb.Struct { + str, err := structpb.NewStruct(v) + if err != nil { + require.NoError(t, err) + } + return str +} + +// generateLogFile generates a log file by appending the current +// time to it every second. +func generateLogFile(t *testing.T, fullPath string) { + t.Helper() + f, err := os.Create(fullPath) + if err != nil { + t.Fatalf("could not create file '%s: %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(time.Second) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + for { + select { + case <-done: + return + case now := <-ticker.C: + _, err := fmt.Fprintln(f, now.Format(time.RFC3339)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, so just log it instead + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + } + } + }() +} + +func ensureESIsRunning(t *testing.T) { + t.Helper() + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(500*time.Second)) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "http://localhost:9200", nil) + if err != nil { + t.Fatalf("cannot create request to ensure ES is running: %s", err) + } + + user := os.Getenv("ES_USER") + if user == "" { + user = "admin" + } + + pass := os.Getenv("ES_PASS") + if pass == "" { + pass = "testing" + } + + req.SetBasicAuth(user, pass) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + // If you're reading this message, you probably forgot to start ES + // run `mage compose:Up` from Filebeat's folder to start all + // containers required for integration tests + t.Fatalf("cannot execute HTTP request to ES: %s", err) + } + if resp.StatusCode != http.StatusOK { + t.Errorf("unexpected HTTP status: %d, expecting 200 - OK", resp.StatusCode) + } +} diff --git a/x-pack/libbeat/management/config.go b/x-pack/libbeat/management/config.go index ca5bc72b79a..82986031465 100644 --- a/x-pack/libbeat/management/config.go +++ b/x-pack/libbeat/management/config.go @@ -5,15 +5,18 @@ package management import ( + "errors" + "github.com/elastic/beats/v7/libbeat/common/reload" conf "github.com/elastic/elastic-agent-libs/config" ) // Config for central management type Config struct { - Enabled bool `config:"enabled" yaml:"enabled"` - Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"` - RestartOnOutputChange bool `config:"restart_on_output_change" yaml:"restart_on_output_change"` + Enabled bool `config:"enabled" yaml:"enabled"` + Blacklist ConfigBlacklistSettings `config:"blacklist" yaml:"blacklist"` + RestartOnOutputChange bool `config:"restart_on_output_change" yaml:"restart_on_output_change"` + InsecureGRPCURLForTesting string `config:"insecure_grpc_url_for_testing" yaml:"insecure_grpc_url_for_testing"` } // ConfigBlock stores a piece of config from central management @@ -56,3 +59,10 @@ func (c *ConfigBlock) ConfigWithMeta() (*reload.ConfigWithMeta, error) { Config: config, }, nil } + +func (c Config) Validate() error { + if !c.Enabled && c.InsecureGRPCURLForTesting != "" { + return errors.New("'management.insecure_grpc_url_for_testing' can only be used if 'management.enabled' is set.") + } + return nil +} diff --git a/x-pack/libbeat/management/input_reload_test.go b/x-pack/libbeat/management/input_reload_test.go new file mode 100644 index 00000000000..972c42908f1 --- /dev/null +++ b/x-pack/libbeat/management/input_reload_test.go @@ -0,0 +1,207 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/joeshaw/multierror" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +func TestInputReload(t *testing.T) { + // Uncomment the line below to see the debug logs for this test + // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*", "centralmgmt.V2-manager")) + r := reload.NewRegistry() + + output := &reloadable{} + r.MustRegisterOutput(output) + + reloadCallCount := 0 + inputs := &reloadableListMock{ + ReloadImpl: func(configs []*reload.ConfigWithMeta) error { + reloadCallCount++ + if reloadCallCount == 1 { + e1 := multierror.Errors{fmt.Errorf("%w", &common.ErrInputNotFinished{ + State: "", + File: "/tmp/foo.log", + })} + return e1.Err() + } + + return nil + }, + } + r.MustRegisterInput(inputs) + + configIdx := -1 + onObserved := func(observed *proto.CheckinObserved, currentIdx int) { + configIdx = currentIdx + } + + srv := mockSrv([][]*proto.UnitExpected{ + { + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "log-input", + Type: "log", + Name: "log", + Streams: []*proto.Stream{ + { + Id: "log-input-1", + Source: requireNewStruct(t, map[string]interface{}{ + "paths": []interface{}{"/tmp/foo.log"}, + }), + }, + }, + }, + }, + }, + { + { + Id: "output-unit", + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: "elasticsearch", + }, + }, + { + Id: "input-unit-1", + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "log-input-2", + Type: "log", + Name: "log", + Streams: []*proto.Stream{ + { + Id: "log-input-2", + Source: requireNewStruct(t, map[string]interface{}{ + "paths": []interface{}{"/tmp/foo.log"}, + }), + }, + }, + }, + }, + }, + }, + []uint64{1, 1}, + []*proto.Features{ + nil, + nil, + }, + onObserved, + 500*time.Millisecond, + ) + require.NoError(t, srv.Start()) + defer srv.Stop() + + client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ + Name: "program", + Version: "v1.0.0", + Meta: map[string]string{ + "key": "value", + }, + }, grpc.WithTransportCredentials(insecure.NewCredentials())) + + m, err := NewV2AgentManagerWithClient( + &Config{ + Enabled: true, + }, + r, + client, + WithChangeDebounce(300*time.Millisecond), + WithForceReloadDebounce(800*time.Millisecond), + ) + require.NoError(t, err) + + mm := m.(*BeatV2Manager) + + err = m.Start() + require.NoError(t, err) + defer m.Stop() + + forceReloadStates := []bool{false, true, false} + forceReloadStateIdx := 0 + forceReloadLastState := true // starts on true so the first iteration is already a change + + eventuallyCheck := func() bool { + forceReload := mm.forceReload + // That detects a state change, we only count/advance steps + // on state changes + if forceReload != forceReloadLastState { + forceReloadLastState = forceReload + if forceReload == forceReloadStates[forceReloadStateIdx] { + // Set to the next state + forceReloadStateIdx++ + } + + // If we went through all states, then succeed + if forceReloadStateIdx == len(forceReloadStates) { + // If we went through all states + if configIdx == 1 { + return true + } + } + } + + return false + } + + require.Eventually(t, eventuallyCheck, 20*time.Second, 300*time.Millisecond, + "the expected changes on forceReload did not happen") +} + +type reloadableListMock struct { + mx sync.Mutex + configs []*reload.ConfigWithMeta + ReloadImpl func(configs []*reload.ConfigWithMeta) error +} + +func (r *reloadableListMock) Reload(configs []*reload.ConfigWithMeta) error { + r.mx.Lock() + defer r.mx.Unlock() + return r.ReloadImpl(configs) +} + +func (r *reloadableListMock) Configs() []*reload.ConfigWithMeta { + r.mx.Lock() + defer r.mx.Unlock() + return r.configs +} diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 922170177d7..70db3fd88b0 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -6,6 +6,7 @@ package management import ( "context" + "errors" "fmt" "os" "os/signal" @@ -15,9 +16,12 @@ import ( "github.com/joeshaw/multierror" "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" gproto "google.golang.org/protobuf/proto" "gopkg.in/yaml.v2" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/features" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -49,9 +53,10 @@ type BeatV2Manager struct { errCanceller context.CancelFunc // track individual units given to us by the V2 API - mx sync.Mutex - units map[unitKey]*client.Unit - actions []client.Action + mx sync.Mutex + units map[unitKey]*client.Unit + actions []client.Action + forceReload bool // status is reported as a whole for every unit sent to this component // hopefully this can be improved in the future to be separated per unit @@ -84,6 +89,13 @@ type BeatV2Manager struct { lastBeatOutputCfg *reload.ConfigWithMeta lastBeatInputCfgs []*reload.ConfigWithMeta lastBeatFeaturesCfg *conf.C + + // changeDebounce is the debounce time for a configuration change + changeDebounce time.Duration + // forceReloadDebounce is the time the manager will wait before + // trying to reload the configuration after an input not finished error + // happens + forceReloadDebounce time.Duration } // ================================ @@ -95,6 +107,20 @@ func WithStopOnEmptyUnits(m *BeatV2Manager) { m.stopOnEmptyUnits = true } +// WithChangeDebounce sets the changeDeboung value +func WithChangeDebounce(d time.Duration) func(b *BeatV2Manager) { + return func(b *BeatV2Manager) { + b.changeDebounce = d + } +} + +// WithForceReloadDebounce sets the forceReloadDebounce value +func WithForceReloadDebounce(d time.Duration) func(b *BeatV2Manager) { + return func(b *BeatV2Manager) { + b.forceReloadDebounce = d + } +} + // ================================ // Init Functions // ================================ @@ -109,22 +135,38 @@ func init() { // This is registered as the manager factory in init() so that calls to // lbmanagement.NewManager will be forwarded here. func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement.Manager, error) { + logger := logp.NewLogger(lbmanagement.DebugK).Named("V2-manager") c := DefaultConfig() if config.Enabled() { if err := config.Unpack(&c); err != nil { return nil, fmt.Errorf("parsing fleet management settings: %w", err) } } - agentClient, _, err := client.NewV2FromReader(os.Stdin, client.VersionInfo{ - Name: "beat-v2-client", - Version: version.GetDefaultVersion(), - Meta: map[string]string{ - "commit": version.Commit(), - "build_time": version.BuildTime().String(), - }, - }) - if err != nil { - return nil, fmt.Errorf("error reading control config from agent: %w", err) + + var agentClient client.V2 + var err error + if c.InsecureGRPCURLForTesting != "" && c.Enabled { + // Insecure for testing Elastic-Agent-Client initialisation + logger.Info("Using INSECURE GRPC connection, this should be only used for testing!") + agentClient = client.NewV2(c.InsecureGRPCURLForTesting, + "", // Insecure connection for test, no token needed + client.VersionInfo{ + Name: "beat-v2-client-for-testing", + Version: version.GetDefaultVersion(), + }, grpc.WithTransportCredentials(insecure.NewCredentials())) + } else { + // Normal Elastic-Agent-Client initialisation + agentClient, _, err = client.NewV2FromReader(os.Stdin, client.VersionInfo{ + Name: "beat-v2-client", + Version: version.GetDefaultVersion(), + Meta: map[string]string{ + "commit": version.Commit(), + "build_time": version.BuildTime().String(), + }, + }) + if err != nil { + return nil, fmt.Errorf("error reading control config from agent: %w", err) + } } // officially running under the elastic-agent; we set the publisher pipeline @@ -150,6 +192,12 @@ func NewV2AgentManagerWithClient(config *Config, registry *reload.Registry, agen status: lbmanagement.Running, message: "Healthy", stopChan: make(chan struct{}, 1), + changeDebounce: time.Second, + // forceReloadDebounce is greater than changeDebounce because it is only + // used when an input has not reached its finished state, this means some events + // still need to be acked by the acker, hence the longer we wait the more likely + // for the input to have reached its finished state. + forceReloadDebounce: time.Second * 10, } if config.Enabled { @@ -378,15 +426,13 @@ func (cm *BeatV2Manager) watchErrChan(ctx context.Context) { } func (cm *BeatV2Manager) unitListen() { - const changeDebounce = 100 * time.Millisecond - // register signal handler sigc := make(chan os.Signal, 1) signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) // timer is used to provide debounce on unit changes // this allows multiple changes to come in and only a single reload be performed - t := time.NewTimer(changeDebounce) + t := time.NewTimer(cm.changeDebounce) t.Stop() // starts stopped, until a change occurs cm.logger.Debug("Listening for agent unit changes") @@ -411,7 +457,8 @@ func (cm *BeatV2Manager) unitListen() { return case change := <-cm.client.UnitChanges(): cm.logger.Infof( - "BeatV2Manager.unitListen UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s", + "BeatV2Manager.unitListen UnitChanged.ID(%s), UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s", + change.Unit.ID(), change.Type, int64(change.Triggers), change.Type, change.Triggers) switch change.Type { @@ -420,11 +467,11 @@ func (cm *BeatV2Manager) unitListen() { case client.UnitChangedAdded: cm.addUnit(change.Unit) // reset can be called here because `<-t.C` is handled in the same select - t.Reset(changeDebounce) + t.Reset(cm.changeDebounce) case client.UnitChangedModified: cm.modifyUnit(change.Unit) // reset can be called here because `<-t.C` is handled in the same select - t.Reset(changeDebounce) + t.Reset(cm.changeDebounce) case client.UnitChangedRemoved: cm.deleteUnit(change.Unit) } @@ -439,6 +486,10 @@ func (cm *BeatV2Manager) unitListen() { } cm.mx.Unlock() cm.reload(units) + if cm.forceReload { + // Restart the debounce timer so we try to reload the inputs. + t.Reset(cm.forceReloadDebounce) + } } } } @@ -642,15 +693,62 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*client.Unit) error { inputBeatCfgs = append(inputBeatCfgs, inputCfg...) } - if !didChange(cm.lastInputCfgs, inputCfgs) { + if !didChange(cm.lastInputCfgs, inputCfgs) && !cm.forceReload { cm.logger.Debug("Skipped reloading input units; configuration didn't change") return nil } - err := obj.Reload(inputBeatCfgs) - if err != nil { - return fmt.Errorf("failed to reloading inputs: %w", err) + if cm.forceReload { + cm.logger.Info("Reloading Beats inputs because forceReload is true. " + + "Set log level to debug to get more information about which " + + "inputs are causing this.") + } + + if err := obj.Reload(inputBeatCfgs); err != nil { + merror := &multierror.MultiError{} + realErrors := multierror.Errors{} + + // At the moment this logic is tightly bound to the current RunnerList + // implementation from libbeat/cfgfile/list.go and Input.loadStates from + // filebeat/input/log/input.go. + // If they change the way they report errors, this will break. + // TODO (Tiago): update all layers to use the most recent features from + // the standard library errors package. + if errors.As(err, &merror) { + for _, err := range merror.Errors { + causeErr := errors.Unwrap(err) + // A Log input is only marked as finished when all events it + // produced are acked by the acker so when we see this error, + // we just retry until the new input can be started. + // This is the same logic used by the standalone configuration file + // reloader implemented on libbeat/cfgfile/reload.go + inputNotFinishedErr := &common.ErrInputNotFinished{} + if ok := errors.As(causeErr, &inputNotFinishedErr); ok { + cm.logger.Debugf("file '%s' is not finished, will retry starting the input soon", inputNotFinishedErr.File) + cm.forceReload = true + cm.logger.Debug("ForceReload set to TRUE") + continue + } + + // This is an error that cannot be ignored, so we report it + realErrors = append(realErrors, err) + } + } + + if len(realErrors) != 0 { + return fmt.Errorf("failed to reload inputs: %w", realErrors.Err()) + } + } else { + // If there was no error reloading input and forceReload was + // true, then set it to false. This prevents unnecessary logging + // and makes it clear this was the moment when the input reload + // finally worked. + if cm.forceReload { + cm.forceReload = false + cm.logger.Debug("ForceReload set to FALSE") + } } + cm.lastInputCfgs = inputCfgs cm.lastBeatInputCfgs = inputBeatCfgs return nil diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 952263a43fb..538dd0c5802 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -25,7 +25,6 @@ import ( ) func TestManagerV2(t *testing.T) { - r := reload.NewRegistry() output := &reloadable{} @@ -239,7 +238,7 @@ func mockSrv( if observedCallback != nil { observedCallback(observed, i) } - matches := doesStateMatch(observed, units[i], featuresIdxs[i]) + matches := DoesStateMatch(observed, units[i], featuresIdxs[i]) if !matches { // send same set of units and features return &proto.CheckinExpected{ @@ -274,35 +273,6 @@ func mockSrv( } } -func doesStateMatch( - observed *proto.CheckinObserved, - expectedUnits []*proto.UnitExpected, - expectedFeaturesIdx uint64, -) bool { - if len(observed.Units) != len(expectedUnits) { - return false - } - expectedMap := make(map[unitKey]*proto.UnitExpected) - for _, exp := range expectedUnits { - expectedMap[unitKey{client.UnitType(exp.Type), exp.Id}] = exp - } - for _, unit := range observed.Units { - exp, ok := expectedMap[unitKey{client.UnitType(unit.Type), unit.Id}] - if !ok { - return false - } - if unit.State != exp.State || unit.ConfigStateIdx != exp.ConfigStateIdx { - return false - } - } - - if observed.FeaturesIdx != expectedFeaturesIdx { - return false - } - - return true -} - type reloadable struct { mx sync.Mutex config *reload.ConfigWithMeta diff --git a/x-pack/libbeat/management/test_utils.go b/x-pack/libbeat/management/test_utils.go new file mode 100644 index 00000000000..fba637154b5 --- /dev/null +++ b/x-pack/libbeat/management/test_utils.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package management + +import ( + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" +) + +// DoesStateMatch returns true if the observed state matches +// the expectedUnits and Features. +func DoesStateMatch( + observed *proto.CheckinObserved, + expectedUnits []*proto.UnitExpected, + expectedFeaturesIdx uint64, +) bool { + if len(observed.Units) != len(expectedUnits) { + return false + } + expectedMap := make(map[unitKey]*proto.UnitExpected) + for _, exp := range expectedUnits { + expectedMap[unitKey{client.UnitType(exp.Type), exp.Id}] = exp + } + for _, unit := range observed.Units { + exp, ok := expectedMap[unitKey{client.UnitType(unit.Type), unit.Id}] + if !ok { + return false + } + if unit.State != exp.State || unit.ConfigStateIdx != exp.ConfigStateIdx { + return false + } + } + + return observed.FeaturesIdx == expectedFeaturesIdx +}