Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat | filebeat] Log error when parsing config block and disabled input on filebeat #30534

Merged
merged 12 commits into from
Mar 2, 2022
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...main[Check the HEAD dif
- Fix the ability for subcommands to be ran properly from the beats containers. {pull}30452[30452]
- Update docker/distribution dependency library to fix a security issues concerning OCI Manifest Type Confusion Issue. {pull}30462[30462]
- Fixes Beats crashing when glibc >= 2.35 is used {issue}30576[30576]
- Log errors when parsing and applying config blocks and if the input is disabled. {pull}30534[30534]

*Auditbeat*

Expand Down
31 changes: 19 additions & 12 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,30 +68,28 @@ func (c *crawler) Start(
) error {
log := c.log

log.Infof("Loading Inputs: %v", len(c.inputConfigs))
log.Infof("Loading Inputs: %d", len(c.inputConfigs))

// Prospect the globs/paths given on the command line and launch harvesters
for _, inputConfig := range c.inputConfigs {
err := c.startInput(pipeline, inputConfig)
if err != nil {
return fmt.Errorf("starting input failed: %+v", err)
return fmt.Errorf("starting input failed: %w", err)
}
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %+v", err)
return fmt.Errorf("creating input reloader failed: %w", err)
}

}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %+v", err)
return fmt.Errorf("creating module reloader failed: %w", err)
}

}

if c.inputReloader != nil {
Expand All @@ -105,7 +103,7 @@ func (c *crawler) Start(
}()
}

log.Infof("Loading and starting Inputs completed. Enabled inputs: %v", len(c.inputs))
log.Infof("Loading and starting Inputs completed. Enabled inputs: %d", len(c.inputs))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndersonQ The len() is a good start but let's be more precise like having a detailed count for each input. IE. Filestream input: 2, same comment for this log statement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm logging the input config keys now on startInput now.
The inputs field doesn't have much information to log, it's an implementation of Runner :/

return nil
}
Expand All @@ -114,23 +112,32 @@ func (c *crawler) startInput(
pipeline beat.PipelineConnector,
config *common.Config,
) error {
// TODO: Either use debug or remove it after https://github.com/elastic/beats/pull/30534
// is fixed.
c.log.Infof("starting input, keys present on the config: %v",
config.FlattenedKeys())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++ for not leaking senstitive information.


if !config.Enabled() {
c.log.Infof("input disabled, skipping it")
return nil
}

var h map[string]interface{}
config.Unpack(&h)
err := config.Unpack(&h)
if err != nil {
return fmt.Errorf("could not unpack config: %w", err)
}
id, err := hashstructure.Hash(h, nil)
if err != nil {
return fmt.Errorf("can not compute id from configuration: %v", err)
return fmt.Errorf("can not compute id from configuration: %w", err)
}
if _, ok := c.inputs[id]; ok {
return fmt.Errorf("input with same ID already exists: %v", id)
return fmt.Errorf("input with same ID already exists: %d", id)
}

runner, err := c.inputsFactory.Create(pipeline, config)
if err != nil {
return fmt.Errorf("error while initializing input: %v", err)
return fmt.Errorf("error while initializing input: %w", err)
}
if inputRunner, ok := runner.(*input.Runner); ok {
inputRunner.Once = c.once
Expand All @@ -155,7 +162,7 @@ func (c *crawler) Stop() {
}()
}

logp.Info("Stopping %v inputs", len(c.inputs))
logp.Info("Stopping %d inputs", len(c.inputs))
// Stop inputs in parallel
for id, p := range c.inputs {
id, p := id, p
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (h *Harvester) Run() error {
}
}(h.state.Source)

logger.Info("Harvester started for file.")
logger.Infof("Harvester started for paths: %v", h.config.Paths)

h.doneWg.Add(1)
go func() {
Expand Down
10 changes: 6 additions & 4 deletions filebeat/tests/system/test_stdin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ def test_stdin(self):

proc = self.start_beat()

msg = "Harvester started"
self.wait_until(
lambda: self.log_contains(
"Harvester started for file."),
max_timeout=10)
lambda: self.log_contains(msg),
max_timeout=10,
err_msg=f"did not find '{msg}' in the logs")

iterations1 = 5
for n in range(0, iterations1):
Expand Down Expand Up @@ -102,4 +103,5 @@ def test_stdin_is_exclusive(self):

filebeat = self.start_beat()
filebeat.check_wait(exit_code=1)
assert self.log_contains("Exiting: stdin requires to be run in exclusive mode, configured inputs: stdin, udp")
msg = "Exiting: stdin requires to be run in exclusive mode, configured inputs: stdin, udp"
assert self.log_contains(msg), f"did not find '{msg}' in the logs"
4 changes: 2 additions & 2 deletions libbeat/tests/system/beat/beat.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def setUp(self):
# running tests in parallel
pass

def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond", err_msg=""):
"""
TODO: this can probably be a "wait_until_output_count", among other things, since that could actually use `self`, and this can become an internal function
Waits until the cond function returns true,
Expand All @@ -427,7 +427,7 @@ def wait_until(self, cond, max_timeout=10, poll_interval=0.1, name="cond"):
while not cond():
if datetime.now() - start > timedelta(seconds=max_timeout):
raise WaitTimeoutError(
f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds.")
f"Timeout waiting for condition '{name}'. Waited {max_timeout} seconds: {err_msg}")
time.sleep(poll_interval)

def wait_until_output_has_key(self, key: str, max_timeout=15):
Expand Down
6 changes: 4 additions & 2 deletions x-pack/libbeat/management/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func (cm *Manager) OnConfig(s string) {
if errs := cm.apply(blocks); errs != nil {
// `cm.apply` already logs the errors; currently allow beat to run degraded
cm.updateStatusWithError(err)
cm.logger.Errorf("failed applying config blocks: %v", err)
return
}

Expand Down Expand Up @@ -256,8 +257,8 @@ func (cm *Manager) apply(blocks ConfigBlocks) error {
}

// Unset missing configs
for name := range missing {
if missing[name] {
for name, isMissing := range missing {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 nice change.

if isMissing {
if err := cm.reload(name, []*ConfigBlock{}); err != nil {
errors = multierror.Append(errors, err)
}
Expand Down Expand Up @@ -319,6 +320,7 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (ConfigBlocks, error) {
for _, regName := range cm.registry.GetRegisteredNames() {
iBlock, err := cfg.GetValue(regName)
if err != nil {
cm.logger.Errorf("failed to get '%s' from config: %v. Continuing to next one", regName, err)
continue
}

Expand Down