Skip to content

Commit

Permalink
Properly shut down crawler in case one prospector is misconfigured (e…
Browse files Browse the repository at this point in the history
…lastic#4037)

If one prospector started to already send data and a second one was misconfigured, the beat paniced during shutdown. This is no prevented by properly shutting down the crawler also on error.

Closes elastic#3917
  • Loading branch information
ruflin authored and exekias committed Apr 19, 2017
1 parent aec81e3 commit 95195cc
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Downgrade Elasticsearch per batch item failure log to debug level. {issue}3953[3953]
- Allow log lines without a program name in the Syslog fileset. {pull}3944[3944]
- Fix panic in JSON decoding code if the input line is "null". {pull}4042[4042]
- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037]

*Heartbeat*
- Add default ports in HTTP monitor. {pull}3924[3924]
Expand Down
1 change: 1 addition & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

err = crawler.Start(registrar, config.ProspectorReload)
if err != nil {
crawler.Stop()
return err
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er

err = p.LoadStates(states)
if err != nil {
return fmt.Errorf("error loading states for propsector %v: %v", p.ID(), err)
return fmt.Errorf("error loading states for prospector %v: %v", p.ID(), err)
}

c.prospectors[p.ID()] = p
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *Prospector) Start() {
logp.Info("Prospector channel stopped")
return
case <-p.beatDone:
logp.Info("Prospector channel stopped")
logp.Info("Prospector channel stopped because beat is stopping.")
return
case event := <-p.harvesterChan:
// No stopping on error, because on error it is expected that beatDone is closed
Expand Down
4 changes: 4 additions & 0 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func NewLog(p *Prospector) (*Log, error) {
config: p.config,
}

if len(p.config.Paths) == 0 {
return nil, fmt.Errorf("each prospector must have at least one path defined")
}

return prospectorer, nil
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/prospector/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestProspectorFileExclude(t *testing.T) {

prospector := Prospector{
config: prospectorConfig{
Paths: []string{"test.log"},
ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)},
},
}
Expand Down
3 changes: 3 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ filebeat.prospectors:
max_lines: {{ max_lines|default(500) }}
{% endif %}
{% endif %}
{% if prospector_raw %}
{{prospector_raw}}
{% endif %}

filebeat.spool_size:
filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }}
Expand Down
24 changes: 24 additions & 0 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,27 @@ def nasa_logs(self):
self.copy_files(["logs/nasa-50k.log"],
source_dir="../files",
target_dir="log")

def test_stopping_empty_path(self):
"""
Test filebeat stops properly when 1 prospector has an invalid config.
"""

prospector_raw = """
- input_type: log
paths: []
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
prospector_raw=prospector_raw,
)
filebeat = self.start_beat()
time.sleep(2)

# Wait until first flush
self.wait_until(
lambda: self.log_contains_count("No paths were defined for prospector") >= 1,
max_timeout=5)

filebeat.check_wait(exit_code=1)

0 comments on commit 95195cc

Please sign in to comment.