Skip to content

Commit

Permalink
Properly shut down crawler in case one prospector is misconfigured
Browse files Browse the repository at this point in the history
If one prospector started to already send data and a second one was misconfigured, the beat paniced during shutdown. This is no prevented by properly shutting down the crawler also on error.

Closes #3917
  • Loading branch information
ruflin committed Apr 19, 2017
1 parent aec81e3 commit 511f67d
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v5.1.1...master[Check the HEAD diff]
- Downgrade Elasticsearch per batch item failure log to debug level. {issue}3953[3953]
- Allow log lines without a program name in the Syslog fileset. {pull}3944[3944]
- Fix panic in JSON decoding code if the input line is "null". {pull}4042[4042]
- Properly shut down crawler in case one prospector is misconfigured. {pull}4037[4037]

*Heartbeat*
- Add default ports in HTTP monitor. {pull}3924[3924]
Expand Down
1 change: 1 addition & 0 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {

err = crawler.Start(registrar, config.ProspectorReload)
if err != nil {
crawler.Stop()
return err
}

Expand Down
2 changes: 1 addition & 1 deletion filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Crawler) startProspector(config *common.Config, states []file.State) er

err = p.LoadStates(states)
if err != nil {
return fmt.Errorf("error loading states for propsector %v: %v", p.ID(), err)
return fmt.Errorf("error loading states for prospector %v: %v", p.ID(), err)
}

c.prospectors[p.ID()] = p
Expand Down
2 changes: 1 addition & 1 deletion filebeat/prospector/prospector.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (p *Prospector) Start() {
logp.Info("Prospector channel stopped")
return
case <-p.beatDone:
logp.Info("Prospector channel stopped")
logp.Info("Prospector channel stopped because beat is stopping.")
return
case event := <-p.harvesterChan:
// No stopping on error, because on error it is expected that beatDone is closed
Expand Down
4 changes: 4 additions & 0 deletions filebeat/prospector/prospector_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func NewLog(p *Prospector) (*Log, error) {
config: p.config,
}

if len(p.config.Paths) == 0 {
return nil, fmt.Errorf("each prospector must have at least one path defined")
}

return prospectorer, nil
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/prospector/prospector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestProspectorFileExclude(t *testing.T) {

prospector := Prospector{
config: prospectorConfig{
Paths: []string{"test.log"},
ExcludeFiles: []match.Matcher{match.MustCompile(`\.gz$`)},
},
}
Expand Down
3 changes: 3 additions & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ filebeat.prospectors:
max_lines: {{ max_lines|default(500) }}
{% endif %}
{% endif %}
{% if prospector_raw %}
{{prospector_raw}}
{% endif %}

filebeat.spool_size:
filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }}
Expand Down
24 changes: 24 additions & 0 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,27 @@ def nasa_logs(self):
self.copy_files(["logs/nasa-50k.log"],
source_dir="../files",
target_dir="log")

def test_stopping_empty_path(self):
"""
Test filebeat stops properly when 1 prospector has an invalid config.
"""

prospector_raw = """
- input_type: log
paths: []
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
prospector_raw=prospector_raw,
)
filebeat = self.start_beat()
time.sleep(2)

# Wait until first flush
self.wait_until(
lambda: self.log_contains_count("No paths were defined for prospector") >= 1,
max_timeout=5)

filebeat.check_wait(exit_code=1)

0 comments on commit 511f67d

Please sign in to comment.