Skip to content

Commit

Permalink
Debounce input reload on autodiscover (#35645)
Browse files Browse the repository at this point in the history
The Kubernetes autodiscover feature now incorporates a debounce logic
when reloading inputs. By default, it waits for at least 1 second
before invoking the Reload method. In case of an error, it introduces
a 10-second delay before retrying.

The channel used for test synchronisation has been removed and tests
now use (assert/require).Eventually.

When Autodiscover calls `cfgfile.NewRunnerList` to instantiate a
RunnerList, it now specifies a different logger name, enabling more
granular log filtering.

Debug logs now provide information about the reasons for invoking
Reload.

Certain tests that perform sequential actions now utilise `require`
instead of `assert` to maintain a consistent state avoid cascading
failures.

Tests that required updates now leverage `require.Eventually` instead
of `wait`, providing additional information on failure causes.

Documentation for `cfgfile.RunnerList` has been improved to enhance
clarity.
  • Loading branch information
belimawr authored Jun 20, 2023
1 parent bdb67bc commit d270536
Show file tree
Hide file tree
Showing 7 changed files with 330 additions and 72 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]


*Filebeat*
- Fix input reload on autodiscover {issue}34388[34388] {pull}35645[35645]


*Heartbeat*
Expand Down
6 changes: 6 additions & 0 deletions dev-tools/mage/gotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,12 @@ func BuildSystemTestGoBinary(binArgs TestBinaryArgs) error {
"test", "-c",
"-o", binArgs.Name + ".test",
}

if DevBuild {
// Disable optimizations (-N) and inlining (-l) for debugging.
args = append(args, `-gcflags=all=-N -l`)
}

if TestCoverage {
args = append(args, "-coverpkg", "./...")
}
Expand Down
18 changes: 17 additions & 1 deletion filebeat/tests/system/test_autodiscover.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,23 @@ def _test(self, container):
with open(os.path.join(self.working_dir, f'{container.name}.log'), 'wb') as f:
f.write(b'Busybox output 1\n')

self.wait_until(lambda: self.log_contains('Starting runner: input'))
docker_client = docker.from_env()

def wait_container_start():
for i, c in enumerate(docker_client.containers.list()):
if c.name == container.name:
return True

# Ensure the container is running before checkging
# if the input is running
self.wait_until(
wait_container_start,
name="wait for test container",
err_msg="the test container is not running yet")

self.wait_until(lambda: self.log_contains('Starting runner: input'),
name="wait for input to start",
err_msg="did not find 'Starting runner: input' in the logs")
self.wait_until(lambda: self.output_has(lines=1))

output = self.read_output_json()
Expand Down
79 changes: 43 additions & 36 deletions libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"fmt"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/autodiscover/meta"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
Expand All @@ -35,8 +33,8 @@ import (
)

const (
// If a config reload fails after a new event, a new reload will be run after this period
retryPeriod = 10 * time.Second
// defaultDebouncePeriod is the time autodiscover will wait before reloading inputs
defaultDebouncePeriod = time.Second
)

// EventConfigurer is used to configure the creation of configuration objects
Expand Down Expand Up @@ -65,10 +63,7 @@ type Autodiscover struct {
meta *meta.Map
listener bus.Listener
logger *logp.Logger

// workDone is a channel used for testing purpouses, to know when the worker has
// done some work.
workDone chan struct{}
debouncePeriod time.Duration
}

// NewAutodiscover instantiates and returns a new Autodiscover manager
Expand All @@ -90,7 +85,7 @@ func NewAutodiscover(
for _, providerCfg := range c.Providers {
provider, err := Registry.BuildProvider(name, bus, providerCfg, keystore)
if err != nil {
return nil, errors.Wrap(err, "error in autodiscover provider settings")
return nil, fmt.Errorf("error in autodiscover provider settings: %w", err)
}
logger.Debugf("Configured autodiscover provider: %s", provider)
providers = append(providers, provider)
Expand All @@ -102,10 +97,11 @@ func NewAutodiscover(
factory: factory,
configurer: configurer,
configs: map[string]map[uint64]*reload.ConfigWithMeta{},
runners: cfgfile.NewRunnerList("autodiscover", factory, pipeline),
runners: cfgfile.NewRunnerList("autodiscover.cfgfile", factory, pipeline),
providers: providers,
meta: meta.NewMap(),
logger: logger,
debouncePeriod: defaultDebouncePeriod,
}, nil
}

Expand All @@ -132,6 +128,7 @@ func (a *Autodiscover) Start() {

func (a *Autodiscover) worker() {
var updated, retry bool
t := time.NewTimer(defaultDebouncePeriod)

for {
select {
Expand All @@ -142,38 +139,48 @@ func (a *Autodiscover) worker() {
}

if _, ok := event["start"]; ok {
updated = a.handleStart(event)
// if updated is true, we don't want to set it back to false
if a.handleStart(event) {
updated = true
}
}
if _, ok := event["stop"]; ok {
updated = a.handleStop(event)
// if updated is true, we don't want to set it back to false
if a.handleStop(event) {
updated = true
}
}

case <-time.After(retryPeriod):
}
case <-t.C:
if updated || retry {
a.logger.Debugf("Reloading autodiscover configs reason: updated: %t, retry: %t", updated, retry)

if updated || retry {
if retry {
a.logger.Debug("Reloading existing autodiscover configs after error")
}
configs := []*reload.ConfigWithMeta{}
for _, list := range a.configs {
for _, c := range list {
configs = append(configs, c)
}
}

configs := []*reload.ConfigWithMeta{}
for _, list := range a.configs {
for _, c := range list {
configs = append(configs, c)
a.logger.Debugf("calling reload with %d config(s)", len(configs))
err := a.runners.Reload(configs)

// reset updated status
updated = false

// On error, make sure the next run also updates because some runners were not properly loaded
retry = err != nil
if retry {
// The recoverable errors that can lead to retry are related
// to the harvester state, so we need to give the publishing
// pipeline some time to finish flushing the events from that
// file. Hence we wait for 10x the normal debounce period.
t.Reset(10 * a.debouncePeriod)
continue
}
}

err := a.runners.Reload(configs)

// On error, make sure the next run also updates because some runners were not properly loaded
retry = err != nil
// reset updated status
updated = false
}

// For testing purpouses.
if a.workDone != nil {
a.workDone <- struct{}{}
t.Reset(a.debouncePeriod)
}
}
}
Expand Down Expand Up @@ -233,9 +240,9 @@ func (a *Autodiscover) handleStart(event bus.Event) bool {

err = a.factory.CheckConfig(config)
if err != nil {
a.logger.Error(errors.Wrap(err, fmt.Sprintf(
"Auto discover config check failed for config '%s', won't start runner",
conf.DebugString(config, true))))
a.logger.Errorf(
"Auto discover config check failed for config '%s', won't start runner, err: %s",
conf.DebugString(config, true), err)
continue
}
newCfg[hash] = &reload.ConfigWithMeta{
Expand Down
Loading

0 comments on commit d270536

Please sign in to comment.