From 27f045afe89db04420e51b6999e31993538a776f Mon Sep 17 00:00:00 2001 From: Adrian Serrano Date: Fri, 13 Apr 2018 16:32:42 +0200 Subject: [PATCH] Filebeat: Fix leak in log harvester (#6797) (#6829) This patch reorganizes a little bit how the log harvester works, so that suboutlets are only created when the harvester is ready to use them (inside Run()), instead of being passed during constructor. This prevents a memory leak caused by some internal goroutines not stopping if the harvester Setup() fails, for example when files cannot be read. Fixes #6797 --- CHANGELOG.asciidoc | 1 + filebeat/input/log/harvester.go | 36 +++++++++++++++++++-------------- filebeat/input/log/input.go | 15 ++++++++++++-- filebeat/input/stdin/input.go | 4 +++- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 443595e0ba0..538e3e359d5 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di *Filebeat* - Fix panic when log prospector configuration fails to load. {issue}6800[6800] +- Fix memory leak in log prospector when files cannot be read. {issue}6797[6797] *Heartbeat* diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 48a48c16cb3..af813d43011 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -52,6 +52,9 @@ var ( ErrClosed = errors.New("reader closed") ) +// OutletFactory provides an outlet for the harvester +type OutletFactory func() channel.Outleter + // Harvester contains all harvester related data type Harvester struct { id uuid.UUID @@ -75,8 +78,8 @@ type Harvester struct { encoding encoding.Encoding // event/state publishing - forwarder *harvester.Forwarder - publishState func(*util.Data) bool + outletFactory OutletFactory + publishState func(*util.Data) bool onTerminate func() } @@ -87,17 +90,18 @@ func NewHarvester( state file.State, states *file.States, publishState func(*util.Data) bool, - outlet channel.Outleter, + outletFactory OutletFactory, ) (*Harvester, error) { h := &Harvester{ - config: defaultConfig, - state: state, - states: states, - publishState: publishState, - done: make(chan struct{}), - stopWg: &sync.WaitGroup{}, - id: uuid.NewV4(), + config: defaultConfig, + state: state, + states: states, + publishState: publishState, + done: make(chan struct{}), + stopWg: &sync.WaitGroup{}, + id: uuid.NewV4(), + outletFactory: outletFactory, } if err := config.Unpack(&h.config); err != nil { @@ -116,8 +120,6 @@ func NewHarvester( } // Add outlet signal so harvester can also stop itself - outlet = channel.CloseOnSignal(outlet, h.done) - h.forwarder = harvester.NewForwarder(outlet) return h, nil } @@ -164,6 +166,10 @@ func (h *Harvester) Run() error { if h.onTerminate != nil { defer h.onTerminate() } + + outlet := channel.CloseOnSignal(h.outletFactory(), h.done) + forwarder := harvester.NewForwarder(outlet) + // This is to make sure a harvester is not started anymore if stop was already // called before the harvester was started. The waitgroup is not incremented afterwards // as otherwise it could happened that between checking for the close channel and incrementing @@ -308,7 +314,7 @@ func (h *Harvester) Run() error { // Always send event to update state, also if lines was skipped // Stop harvester in case of an error - if !h.sendEvent(data) { + if !h.sendEvent(data, forwarder) { return nil } @@ -335,12 +341,12 @@ func (h *Harvester) Stop() { // sendEvent sends event to the spooler channel // Return false if event was not sent -func (h *Harvester) sendEvent(data *util.Data) bool { +func (h *Harvester) sendEvent(data *util.Data, forwarder *harvester.Forwarder) bool { if h.source.HasState() { h.states.Update(data.GetState()) } - err := h.forwarder.Send(data) + err := forwarder.Send(data) return err == nil } diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index bffe042a419..88be68fde1c 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -572,10 +572,21 @@ func (p *Input) isCleanInactive(state file.State) bool { return false } +// subOutletWrap returns a factory method that will wrap the passed outlet +// in a SubOutlet and memoize the result so the wrapping is done only once. +func subOutletWrap(outlet channel.Outleter) func() channel.Outleter { + var subOutlet channel.Outleter + return func() channel.Outleter { + if subOutlet == nil { + subOutlet = channel.SubOutlet(outlet) + } + return subOutlet + } +} + // createHarvester creates a new harvester instance from the given state func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harvester, error) { // Each wraps the outlet, for closing the outlet individually - outlet := channel.SubOutlet(p.outlet) h, err := NewHarvester( p.cfg, state, @@ -583,7 +594,7 @@ func (p *Input) createHarvester(state file.State, onTerminate func()) (*Harveste func(d *util.Data) bool { return p.stateOutlet.OnEvent(d) }, - outlet, + subOutletWrap(p.outlet), ) if err == nil { h.onTerminate = onTerminate diff --git a/filebeat/input/stdin/input.go b/filebeat/input/stdin/input.go index 81aa4dab29d..025eb064d81 100644 --- a/filebeat/input/stdin/input.go +++ b/filebeat/input/stdin/input.go @@ -73,7 +73,9 @@ func (p *Input) createHarvester(state file.State) (*log.Harvester, error) { h, err := log.NewHarvester( p.cfg, state, nil, nil, - p.outlet, + func() channel.Outleter { + return p.outlet + }, ) return h, err