Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix input reload under Elastic-Agent #35250

Merged
merged 47 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
68df822
Fix input reload under Elastic-Agent
belimawr Apr 27, 2023
7fce900
PR improvements
belimawr Apr 28, 2023
4d0732b
Address lint issues
belimawr Apr 28, 2023
37cfef3
Implement tests
belimawr Apr 28, 2023
2ba009a
Remove test debug logs
belimawr May 2, 2023
66176c8
fmt and add lisence headers
belimawr May 2, 2023
b31bd28
make lint happy again
belimawr May 2, 2023
c902ab8
Add changelog
belimawr May 2, 2023
dd0a47e
improve log entries
belimawr May 3, 2023
0b36d19
Clean up tests
belimawr May 3, 2023
71bd239
PR improvements
belimawr May 5, 2023
4d2e32d
[WIP] Integration tests
belimawr May 5, 2023
a1ac3bb
improve tests, move files to Filebeat
belimawr May 5, 2023
b149c85
clean up tests
belimawr May 5, 2023
dda54d9
[WIP] Use log file for Filebeat
belimawr May 5, 2023
d3e45a2
clean up integration tests
belimawr May 8, 2023
091dd31
Merge remote-tracking branch 'upstream/main' into fix-input-reload-un…
belimawr May 8, 2023
e65618b
debounce and force reload times are configurable
belimawr May 8, 2023
b3028a7
clean up
belimawr May 8, 2023
78930f0
make linter happy
belimawr May 8, 2023
0049bd1
try uploading test logs on failure
belimawr May 8, 2023
ea5d84f
fix archiveArtifacts path
belimawr May 9, 2023
f12f5e9
debugging CI
belimawr May 9, 2023
cff80ed
refactor the way logs are read.
belimawr May 9, 2023
04a87d0
More CI debugging.
belimawr May 9, 2023
3a299a6
Apply suggestions from code review
belimawr May 10, 2023
2424579
PR review changes
belimawr May 10, 2023
892e4ed
more debug logs for CI
belimawr May 10, 2023
5f44afb
use `bufio.Reader` instead of `bufio.Scanner`
belimawr May 10, 2023
c153d74
typo
belimawr May 10, 2023
12a0ba5
clean up after all fixes/debugging
belimawr May 11, 2023
a677530
Final PR improvements.
belimawr May 11, 2023
1ffe0b6
Apply suggestions from code review
belimawr May 12, 2023
dc147b1
Update x-pack/filebeat/tests/integration/framework_test.go
belimawr May 12, 2023
9ea251a
Update x-pack/filebeat/tests/integration/framework_test.go
belimawr May 12, 2023
4c2fcd7
PR improvements
belimawr May 12, 2023
f1bf065
PR improvements
belimawr May 12, 2023
0f67fb2
Apply suggestions from code review
belimawr May 17, 2023
c6a74e9
fix typos
belimawr May 17, 2023
efd1811
improve comments
belimawr May 17, 2023
49f314d
stop logging gorourine when test finishes
belimawr May 17, 2023
e9ce324
Apply suggestions from code review
belimawr May 17, 2023
d36f486
Update x-pack/libbeat/management/managerV2.go
belimawr May 17, 2023
a98f0a9
use env vars with defaults for credentials
belimawr May 30, 2023
8eece56
replace uses of `Cause` by `errors.Unwrap`
belimawr May 30, 2023
844194a
Update x-pack/filebeat/tests/integration/input_reload_test.go
belimawr May 31, 2023
a37a275
Update x-pack/libbeat/management/managerV2.go
belimawr May 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ https://github.com/elastic/beats/compare/v8.7.1\...main[Check the HEAD diff]
- Sanitize filenames for request tracer in cel input. {pull}35154[35154]
- Fix accidental error overwrite in defer statement in entityanalytics Azure AD input. {issue}35153[35153] {pull}35169[35169]
- Fixing the grok expression outputs of log files {pull}35221[35221]
- Fixes "Can only start an input when all related states are finished" error when running under Elastic-Agent {pull}35250[35250] {issue}33653[33653]
- Move repeated Windows event channel not found errors in winlog input to debug level. {issue}35314[35314] {pull}35317[35317]
- Fix crash when processing forwarded logs missing a message. {issue}34705[34705] {pull}34865[34865]

Expand Down
2 changes: 1 addition & 1 deletion Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def withBeatsEnv(Map args = [:], Closure body) {
error("Error '${err.toString()}'")
} finally {
if (archive) {
archiveArtifacts(allowEmptyArchive: true, artifacts: "${directory}/build/system-tests/docker-logs/TEST-docker-compose-*.log")
archiveArtifacts(allowEmptyArchive: true, artifacts: "${directory}/build/system-tests/docker-logs/TEST-docker-compose-*.log, ${directory}/build/integration-tests/**/**")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it just for debugging or we still need it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this test ever fails on CI that will be needed. I believe we should keep it there. Nobody investigating a test failure will remember to re-enable that, run it again then disable it.

I'm trying to make it as easy as possible to find and fix anything that could make this test fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the tests output the junit test report that CI uses to generate the test report? The unit tests do this, there probably isn't a reason the integration tests here couldn't either.

This is all controlled through the mage Go Test wrapper so if you are using that you are set.

https://github.com/belimawr/beats/blob/d36f48672653082fd5a8e27795726fb87c733be2/dev-tools/mage/gotest.go#L211

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get how it could help. What I am trying to do here is to make the logs from the Filebeat test binary available to download on CI, so if the test fails we can see why Filebeat failed/did not behave as expected.

archiveTestOutput(directory: directory, testResults: testResults, artifacts: artifacts, id: args.id, upload: upload)
}
tearDown()
Expand Down
35 changes: 16 additions & 19 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ func NewInput(
cleanupNeeded := true
cleanupIfNeeded := func(f func() error) {
if cleanupNeeded {
f()
if err := f(); err != nil {
logp.L().Named("input.log").Errorf("clean up function returned an error: %w", err)
}
belimawr marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -103,10 +105,10 @@ func NewInput(
return nil, err
}
if err := inputConfig.resolveRecursiveGlobs(); err != nil {
return nil, fmt.Errorf("Failed to resolve recursive globs in config: %v", err)
return nil, fmt.Errorf("Failed to resolve recursive globs in config: %w", err)
}
if err := inputConfig.normalizeGlobPatterns(); err != nil {
return nil, fmt.Errorf("Failed to normalize globs patterns: %v", err)
return nil, fmt.Errorf("Failed to normalize globs patterns: %w", err)
}

if len(inputConfig.Paths) == 0 {
Expand All @@ -115,7 +117,7 @@ func NewInput(

identifier, err := file.NewStateIdentifier(inputConfig.FileIdentity)
if err != nil {
return nil, fmt.Errorf("failed to initialize file identity generator: %+v", err)
return nil, fmt.Errorf("failed to initialize file identity generator: %w", err)
}

// Note: underlying output.
Expand Down Expand Up @@ -192,7 +194,10 @@ func (p *Input) loadStates(states []file.State) error {

// In case a input is tried to be started with an unfinished state matching the glob pattern
if !state.Finished {
return &common.ErrInputNotFinished{State: state.String()}
return &common.ErrInputNotFinished{
State: state.String(),
File: state.Fileinfo.Name(),
}
}

// Convert state to current identifier if different
Expand Down Expand Up @@ -466,7 +471,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) {
var absolutePath string
absolutePath, err = filepath.Abs(path)
if err != nil {
return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %s", absolutePath, err)
return file.State{}, fmt.Errorf("could not fetch abs path for file %s: %w", absolutePath, err)
}
p.logger.Debugf("Check file for harvesting: %s", absolutePath)
// Create new state for comparison
Expand Down Expand Up @@ -548,7 +553,7 @@ func (p *Input) scan() {
if isNewState {
logger.Debugf("Start harvester for new file: %s", newState.Source)
err := p.startHarvester(logger, newState, 0)
if err == errHarvesterLimit {
if errors.Is(err, errHarvesterLimit) {
logger.Debugf(harvesterErrMsg, newState.Source, err)
continue
}
Expand Down Expand Up @@ -673,11 +678,7 @@ func (p *Input) isIgnoreOlder(state file.State) bool {
}

modTime := state.Fileinfo.ModTime()
if time.Since(modTime) > p.config.IgnoreOlder {
return true
}

return false
return time.Since(modTime) > p.config.IgnoreOlder
}

// isCleanInactive checks if the given state false under clean_inactive
Expand All @@ -688,11 +689,7 @@ func (p *Input) isCleanInactive(state file.State) bool {
}

modTime := state.Fileinfo.ModTime()
if time.Since(modTime) > p.config.CleanInactive {
return true
}

return false
return time.Since(modTime) > p.config.CleanInactive
}

// subOutletWrap returns a factory method that will wrap the passed outlet
Expand Down Expand Up @@ -748,7 +745,7 @@ func (p *Input) startHarvester(logger *logp.Logger, state file.State, offset int
err = h.Setup()
if err != nil {
p.numHarvesters.Dec()
return fmt.Errorf("error setting up harvester: %s", err)
return fmt.Errorf("error setting up harvester: %w", err)
}

// Update state before staring harvester
Expand Down Expand Up @@ -783,7 +780,7 @@ func (p *Input) updateState(state file.State) error {
stateToRemove := file.State{Id: state.PrevId, TTL: 0, Finished: true, Meta: nil}
err := p.doUpdate(stateToRemove)
if err != nil {
return fmt.Errorf("failed to remove outdated states based on prev_id: %v", err)
return fmt.Errorf("failed to remove outdated states based on prev_id: %w", err)
}
}

Expand Down
1 change: 1 addition & 0 deletions libbeat/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
// ErrInputNotFinished struct for reporting errors related to not finished inputs
type ErrInputNotFinished struct {
State string
File string
}

// Error method of ErrInputNotFinished
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func IntegTest() {

// GoIntegTest starts the docker containers and executes the Go integration tests.
func GoIntegTest(ctx context.Context) error {
mg.Deps(BuildSystemTestBinary)
return devtools.GoIntegTestFromHost(ctx, devtools.DefaultGoTestIntegrationFromHostArgs())
}

Expand Down
136 changes: 136 additions & 0 deletions x-pack/filebeat/tests/integration/framework_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build integration

package integration
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file is the beginning of a integration tests framework in Go, much like our current Python system tests.

I kept it very simple and only implemented the bare minimum to write tests for this PR, hence it's not even on its own package. Whenever we need to use on a different test we can extract it into another package and improve it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a particular reason to put it under the x-pack path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, all Elastic-Agent related code is here, which includes the manager who will interact with this "framework". Even it it is outside x-pack, it will require x-pack code to work/be used.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file doesn't look like it depends on anything that is x-pack licensed, but the test itself definitely does.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider moving this to an Apache license out of the x-pack directory so it can be reused by the OSS parts of Beats.


import (
"bufio"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)

type BeatProc struct {
Binary string
Args []string
Cmd *exec.Cmd
t *testing.T
tempDir string
}

// NewBeat createa a new Beat process from the system tests binary.
// It sets some required options like the home path, logging, etc.
// `tempDir` will be used as home and logs directory for the Beat
// `args` will be passed as CLI arguments to the Beat
func NewBeat(t *testing.T, binary string, tempDir string, args ...string) BeatProc {
p := BeatProc{
t: t,
Binary: binary,
Args: append([]string{
"--systemTest",
"--path.home", tempDir,
"--path.logs", tempDir,
"-E", "logging.to_files=true",
"-E", "logging.files.rotateeverybytes=104857600", // About 100MB
}, args...),
tempDir: tempDir,
}
return p
}

// Start starts the Beat process
func (b *BeatProc) Start() {
t := b.t
fullPath, err := filepath.Abs(b.Binary)
if err != nil {
t.Fatalf("could not get full path from %q, err: %s", b.Binary, err)
}
b.Cmd = exec.Command(fullPath, b.Args...)

if err := b.Cmd.Start(); err != nil {
t.Fatalf("could not start process: %s", err)
}
t.Cleanup(func() {
pid := b.Cmd.Process.Pid
if err := b.Cmd.Process.Kill(); err != nil {
t.Fatalf("could not stop process with PID: %d, err: %s", pid, err)
}
})
}

// LogContains looks for `s` as a substring of every log line,
// it will open the log file on every call, read it until EOF,
// then close it.
func (b *BeatProc) LogContains(s string) bool {
t := b.t
logFile := b.openLogFile()
defer func() {
if err := logFile.Close(); err != nil {
// That's not quite a test error, but it can impact
// next executions of LogContains, so treat it as an error
t.Errorf("could not close log file: %s", err)
}
}()

r := bufio.NewReader(logFile)
for {
line, err := r.ReadString('\n')
if err != nil {
if err != io.EOF {
t.Fatalf("error reading log file '%s': %s", logFile.Name(), err)
}
break
}
if strings.Contains(line, s) {
return true
}
}

return false
}

// openLogFile opens the log file for reading and returns it.
// It also registers a cleanup function to close the file
// when the test ends.
func (b *BeatProc) openLogFile() *os.File {
t := b.t
AndersonQ marked this conversation as resolved.
Show resolved Hide resolved
glob := fmt.Sprintf("%s-*.ndjson", filepath.Join(b.tempDir, "filebeat"))
files, err := filepath.Glob(glob)
if err != nil {
t.Fatalf("could not expand log file glob: %s", err)
}

require.Eventually(t, func() bool {
files, err = filepath.Glob(glob)
if err != nil {
t.Fatalf("could not expand log file glob: %s", err)
}
return len(files) == 1
}, 5*time.Second, 100*time.Millisecond,
"waiting for log file matching glob '%s' to be created", glob)

// On a normal operation there must be a single log, if there are more
// than one, then there is an issue and the Beat is logging too much,
// which is enough to stop the test
if len(files) != 1 {
t.Fatalf("there must be only one log file for %s, found: %d",
glob, len(files))
}

f, err := os.Open(files[0])
if err != nil {
t.Fatalf("could not open log file '%s': %s", files[0], err)
}

return f
}
Loading