Skip to content

Commit

Permalink
Fix input reload issue in Elastic-Agent with infinite retry logic (#3…
Browse files Browse the repository at this point in the history
…5250)

This commit addresses the input reload issue in Elastic-Agent by introducing an infinite retry logic in the ManagerV2. The implemented logic mirrors the configuration reload behavior of a standalone Beat.

When reloading inputs, if there is at least one occurrence of 'common.ErrInputNotFinished', the 'forceReload' flag is set to true, and the debounce timer is initiated. This process will repeat until no 'common.ErrInputNotFinished' error is encountered.

Additionally, the 'changeDebounce' period is extended to 1 second, and the 'forceReloadDebounce' period is set to 10 times the 'changeDebounce' value.

---------

Co-authored-by: Blake Rouse <[email protected]>
Co-authored-by: Anderson Queiroz <[email protected]>
Co-authored-by: Denis <[email protected]>
(cherry picked from commit 137bc81)
  • Loading branch information
belimawr authored and mergify[bot] committed Jun 1, 2023
1 parent ff53305 commit 0d08e88
Show file tree
Hide file tree
Showing 12 changed files with 899 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Sanitize filenames for request tracer in cel input. {pull}35154[35154]
- Fix accidental error overwrite in defer statement in entityanalytics Azure AD input. {issue}35153[35153] {pull}35169[35169]
- Fixing the grok expression outputs of log files {pull}35221[35221]
- Fixes "Can only start an input when all related states are finished" error when running under Elastic-Agent {pull}35250[35250] {issue}33653[33653]
- Move repeated Windows event channel not found errors in winlog input to debug level. {issue}35314[35314] {pull}35317[35317]
- Fix crash when processing forwarded logs missing a message. {issue}34705[34705] {pull}34865[34865]
- Fix crash when loading azurewebstorage cursor with no partially processed data. {pull}35433[35433]
Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def withBeatsEnv(Map args = [:], Closure body) {
error("Error '${err.toString()}'")
} finally {
if (archive) {
archiveArtifacts(allowEmptyArchive: true, artifacts: "${directory}/build/system-tests/docker-logs/TEST-docker-compose-*.log")
archiveArtifacts(allowEmptyArchive: true, artifacts: "${directory}/build/system-tests/docker-logs/TEST-docker-compose-*.log, ${directory}/build/integration-tests/**/**")
archiveTestOutput(directory: directory, testResults: testResults, artifacts: artifacts, id: args.id, upload: upload)
}
tearDown()
Expand Down
35 changes: 16 additions & 19 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func NewInput(
cleanupNeeded := true
cleanupIfNeeded := func(f func() error) {
if cleanupNeeded {
f()
if err := f(); err != nil {
logp.L().Named("input.log").Errorf("clean up function returned an error: %w", err)
}
}
}

Expand All @@ -103,10 +105,10 @@ func NewInput(
return nil, err
}
if err := inputConfig.resolveRecursiveGlobs(); err != nil {
return nil, fmt.Errorf("Failed to resolve recursive globs in config: %v", err)
return nil, fmt.Errorf("Failed to resolve recursive globs in config: %w", err)
}
if err := inputConfig.normalizeGlobPatterns(); err != nil {
return nil, fmt.Errorf("Failed to normalize globs patterns: %v", err)
return nil, fmt.Errorf("Failed to normalize globs patterns: %w", err)
}

if len(inputConfig.Paths) == 0 {
Expand All @@ -115,7 +117,7 @@ func NewInput(

identifier, err := file.NewStateIdentifier(inputConfig.FileIdentity)
if err != nil {
return nil, fmt.Errorf("failed to initialize file identity generator: %+v", err)
return nil, fmt.Errorf("failed to initialize file identity generator: %w", err)
}

// Note: underlying output.
Expand Down Expand Up @@ -192,7 +194,10 @@ func (p *Input) loadStates(states []file.State) error {

// In case a input is tried to be started with an unfinished state matching the glob pattern
if !state.Finished {
return &common.ErrInputNotFinished{State: state.String()}
return &common.ErrInputNotFinished{
State: state.String(),
File: state.Fileinfo.Name(),
}
}

// Convert state to current identifier if different
Expand Down Expand Up @@ -466,7 +471,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) {
var absolutePath string
absolutePath, err = filepath.Abs(path)
if err != nil {
return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err)
return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %w", absolutePath, err)
}
p.logger.Debugf("Check file for harvesting: %s", absolutePath)
// Create new state for comparison
Expand Down Expand Up @@ -548,7 +553,7 @@ func (p *Input) scan() {
if isNewState {
logger.Debugf("Start harvester for new file: %s", newState.Source)
err := p.startHarvester(logger, newState, 0)
if err == errHarvesterLimit {
if errors.Is(err, errHarvesterLimit) {
logger.Debugf(harvesterErrMsg, newState.Source, err)
continue
}
Expand Down Expand Up @@ -673,11 +678,7 @@ func (p *Input) isIgnoreOlder(state file.State) bool {
}

modTime := state.Fileinfo.ModTime()
if time.Since(modTime) > p.config.IgnoreOlder {
return true
}

return false
return time.Since(modTime) > p.config.IgnoreOlder
}

// isCleanInactive checks if the given state false under clean_inactive
Expand All @@ -688,11 +689,7 @@ func (p *Input) isCleanInactive(state file.State) bool {
}

modTime := state.Fileinfo.ModTime()
if time.Since(modTime) > p.config.CleanInactive {
return true
}

return false
return time.Since(modTime) > p.config.CleanInactive
}

// subOutletWrap returns a factory method that will wrap the passed outlet
Expand Down Expand Up @@ -748,7 +745,7 @@ func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int
err = h.Setup()
if err != nil {
p.numHarvesters.Dec()
return fmt.Errorf("error setting up harvester: %s", err)
return fmt.Errorf("error setting up harvester: %w", err)
}

// Update state before staring harvester
Expand Down Expand Up @@ -783,7 +780,7 @@ func (p *Input) updateState(state file.State) error {
stateToRemove := file.State{Id: state.PrevId, TTL: 0, Finished: true, Meta: nil}
err := p.doUpdate(stateToRemove)
if err != nil {
return fmt.Errorf("failed to remove outdated states based on prev_id: %v", err)
return fmt.Errorf("failed to remove outdated states based on prev_id: %w", err)
}
}

Expand Down
1 change: 1 addition & 0 deletions libbeat/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
// ErrInputNotFinished struct for reporting errors related to not finished inputs
type ErrInputNotFinished struct {
State string
File string
}

// Error method of ErrInputNotFinished
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func IntegTest() {

// GoIntegTest starts the docker containers and executes the Go integration tests.
func GoIntegTest(ctx context.Context) error {
mg.Deps(BuildSystemTestBinary)
return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs())
}

Expand Down
136 changes: 136 additions & 0 deletions x-pack/filebeat/tests/integration/framework_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration

package integration

import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type BeatProc struct {
Binary string
Args []string
Cmd *exec.Cmd
t *testing.T
tempDir string
}

// NewBeat createa a new Beat process from the system tests binary.
// It sets some required options like the home path, logging, etc.
// `tempDir` will be used as home and logs directory for the Beat
// `args` will be passed as CLI arguments to the Beat
func NewBeat(t *testing.T, binary string, tempDir string, args ...string) BeatProc {
p := BeatProc{
t: t,
Binary: binary,
Args: append([]string{
"--systemTest",
"--path.home", tempDir,
"--path.logs", tempDir,
"-E", "logging.to_files=true",
"-E", "logging.files.rotateeverybytes=104857600", // About 100MB
}, args...),
tempDir: tempDir,
}
return p
}

// Start starts the Beat process
func (b *BeatProc) Start() {
t := b.t
fullPath, err := filepath.Abs(b.Binary)
if err != nil {
t.Fatalf("could not get full path from %q, err: %s", b.Binary, err)
}
b.Cmd = exec.Command(fullPath, b.Args...)

if err := b.Cmd.Start(); err != nil {
t.Fatalf("could not start process: %s", err)
}
t.Cleanup(func() {
pid := b.Cmd.Process.Pid
if err := b.Cmd.Process.Kill(); err != nil {
t.Fatalf("could not stop process with PID: %d, err: %s", pid, err)
}
})
}

// LogContains looks for `s` as a substring of every log line,
// it will open the log file on every call, read it until EOF,
// then close it.
func (b *BeatProc) LogContains(s string) bool {
t := b.t
logFile := b.openLogFile()
defer func() {
if err := logFile.Close(); err != nil {
// That's not quite a test error, but it can impact
// next executions of LogContains, so treat it as an error
t.Errorf("could not close log file: %s", err)
}
}()

r := bufio.NewReader(logFile)
for {
line, err := r.ReadString('\n')
if err != nil {
if err != io.EOF {
t.Fatalf("error reading log file '%s': %s", logFile.Name(), err)
}
break
}
if strings.Contains(line, s) {
return true
}
}

return false
}

// openLogFile opens the log file for reading and returns it.
// It also registers a cleanup function to close the file
// when the test ends.
func (b *BeatProc) openLogFile() *os.File {
t := b.t
glob := fmt.Sprintf("%s-*.ndjson", filepath.Join(b.tempDir, "filebeat"))
files, err := filepath.Glob(glob)
if err != nil {
t.Fatalf("could not expand log file glob: %s", err)
}

require.Eventually(t, func() bool {
files, err = filepath.Glob(glob)
if err != nil {
t.Fatalf("could not expand log file glob: %s", err)
}
return len(files) == 1
}, 5*time.Second, 100*time.Millisecond,
"waiting for log file matching glob '%s' to be created", glob)

// On a normal operation there must be a single log, if there are more
// than one, then there is an issue and the Beat is logging too much,
// which is enough to stop the test
if len(files) != 1 {
t.Fatalf("there must be only one log file for %s, found: %d",
glob, len(files))
}

f, err := os.Open(files[0])
if err != nil {
t.Fatalf("could not open log file '%s': %s", files[0], err)
}

return f
}
Loading

0 comments on commit 0d08e88

Please sign in to comment.