Skip to content

Commit

Permalink
Filebeat: Fix random error on harvester close (elastic#21048) (elasti…
Browse files Browse the repository at this point in the history
…c#21056)

This fixes a race condition when a harvester is closed at the same time
that its source file size is being calculated.

Before this fix, `Error updating file size` errors are randomly printed, because
the file handle becomes closed inside the os.Stat call.

(cherry picked from commit 918f17a)
  • Loading branch information
adriansr authored Sep 14, 2020
1 parent f887f55 commit 6aeff15
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fixed `cloudfoundry.access` to have the correct `cloudfoundry.app.id` contents. {pull}17847[17847]
- Fixing `ingress_controller.` fields to be of type keyword instead of text. {issue}17834[17834]
- Fixed typo in log message. {pull}17897[17897]
- Fix an error updating file size being logged when EOF is reached. {pull}21048[21048]

*Heartbeat*

Expand Down
11 changes: 9 additions & 2 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ type Harvester struct {

// shutdown handling
done chan struct{}
doneWg *sync.WaitGroup
stopOnce sync.Once
stopWg *sync.WaitGroup
stopLock sync.Mutex
Expand Down Expand Up @@ -138,6 +139,7 @@ func NewHarvester(
publishState: publishState,
done: make(chan struct{}),
stopWg: &sync.WaitGroup{},
doneWg: &sync.WaitGroup{},
id: id,
outletFactory: outletFactory,
}
Expand Down Expand Up @@ -296,7 +298,11 @@ func (h *Harvester) Run() error {

logp.Info("Harvester started for file: %s", h.state.Source)

go h.monitorFileSize()
h.doneWg.Add(1)
go func() {
h.monitorFileSize()
h.doneWg.Done()
}()

for {
select {
Expand Down Expand Up @@ -375,7 +381,8 @@ func (h *Harvester) monitorFileSize() {
func (h *Harvester) stop() {
h.stopOnce.Do(func() {
close(h.done)

// Wait for goroutines monitoring h.done to terminate before closing source.
h.doneWg.Wait()
filesMetrics.Remove(h.id.String())
})
}
Expand Down

0 comments on commit 6aeff15

Please sign in to comment.