From 76ef32ccde0571a34920569fc8c7c96f51c1c42d Mon Sep 17 00:00:00 2001 From: Nicolas Ruflin Date: Wed, 19 Apr 2017 10:37:34 +0200 Subject: [PATCH] Properly shut down crawler in case one prospector is misconfigured (#4037) If one prospector started to already send data and a second one was misconfigured, the beat paniced during shutdown. This is no prevented by properly shutting down the crawler also on error. Closes https://github.com/elastic/beats/issues/3917 (cherry picked from commit 95195cc855af9a345a67e160fc20c35529478081) --- CHANGELOG.asciidoc | 1 + filebeat/beater/filebeat.go | 1 + filebeat/crawler/crawler.go | 2 +- filebeat/prospector/prospector.go | 2 +- filebeat/prospector/prospector_log.go | 4 ++++ filebeat/prospector/prospector_test.go | 1 + filebeat/tests/system/config/filebeat.yml.j2 | 3 +++ filebeat/tests/system/test_shutdown.py | 24 ++++++++++++++++++++ 8 files changed, 36 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 9f33045b92a..28405fc7c86 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -32,6 +32,7 @@ https://github.com/elastic/beats/compare/v5.3.0...master[Check the HEAD diff] *Filebeat* - Fix modules default file permissions. {pull}3879[3879] +- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037] *Heartbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 0ac217ac156..282b6510a8c 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -181,6 +181,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { err = crawler.Start(registrar, config.ProspectorReload) if err != nil { + crawler.Stop() return err } diff --git a/filebeat/crawler/crawler.go b/filebeat/crawler/crawler.go index 9baf86f8084..1dfa8abaaa5 100644 --- a/filebeat/crawler/crawler.go +++ b/filebeat/crawler/crawler.go @@ -76,7 +76,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er err = p.LoadStates(states) if err != nil { - return fmt.Errorf("error loading states for propsector %v: %v", p.ID(), err) + return fmt.Errorf("error loading states for prospector %v: %v", p.ID(), err) } c.prospectors[p.ID()] = p diff --git a/filebeat/prospector/prospector.go b/filebeat/prospector/prospector.go index 398dcde286e..2e3a8420b54 100644 --- a/filebeat/prospector/prospector.go +++ b/filebeat/prospector/prospector.go @@ -134,7 +134,7 @@ func (p *Prospector) Start() { logp.Info("Prospector channel stopped") return case <-p.beatDone: - logp.Info("Prospector channel stopped") + logp.Info("Prospector channel stopped because beat is stopping.") return case event := <-p.harvesterChan: // No stopping on error, because on error it is expected that beatDone is closed diff --git a/filebeat/prospector/prospector_log.go b/filebeat/prospector/prospector_log.go index e696e30af3a..fb3c3304245 100644 --- a/filebeat/prospector/prospector_log.go +++ b/filebeat/prospector/prospector_log.go @@ -30,6 +30,10 @@ func NewProspectorLog(p *Prospector) (*ProspectorLog, error) { config: p.config, } + if len(p.config.Paths) == 0 { + return nil, fmt.Errorf("each prospector must have at least one path defined") + } + return prospectorer, nil } diff --git a/filebeat/prospector/prospector_test.go b/filebeat/prospector/prospector_test.go index d4e95795764..4876f34e7f6 100644 --- a/filebeat/prospector/prospector_test.go +++ b/filebeat/prospector/prospector_test.go @@ -28,6 +28,7 @@ func TestProspectorFileExclude(t *testing.T) { prospector := Prospector{ config: prospectorConfig{ + Paths: []string{"test.log"}, ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)}, }, } diff --git a/filebeat/tests/system/config/filebeat.yml.j2 b/filebeat/tests/system/config/filebeat.yml.j2 index ce19ea2a534..b2b5c4c0b35 100644 --- a/filebeat/tests/system/config/filebeat.yml.j2 +++ b/filebeat/tests/system/config/filebeat.yml.j2 @@ -70,6 +70,9 @@ filebeat.prospectors: max_lines: {{ max_lines|default(500) }} {% endif %} {% endif %} +{% if prospector_raw %} +{{prospector_raw}} +{% endif %} filebeat.spool_size: filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }} diff --git a/filebeat/tests/system/test_shutdown.py b/filebeat/tests/system/test_shutdown.py index 3ad8dc63f0d..a87ac65be78 100644 --- a/filebeat/tests/system/test_shutdown.py +++ b/filebeat/tests/system/test_shutdown.py @@ -146,3 +146,27 @@ def nasa_logs(self): self.copy_files(["logs/nasa-50k.log"], source_dir="../files", target_dir="log") + + def test_stopping_empty_path(self): + """ + Test filebeat stops properly when 1 prospector has an invalid config. + """ + + prospector_raw = """ +- input_type: log + paths: [] +""" + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + prospector_raw=prospector_raw, + ) + filebeat = self.start_beat() + time.sleep(2) + + # Wait until first flush + self.wait_until( + lambda: self.log_contains_count("No paths were defined for prospector") >= 1, + max_timeout=5) + + filebeat.check_wait(exit_code=1)