From 0a51a685c474b7065f735d63b2fb710b1ea62b1d Mon Sep 17 00:00:00 2001 From: ruflin Date: Mon, 15 May 2017 08:36:48 +0200 Subject: [PATCH] Fix race condition for harvester Start / Stop in registry It was possible that with reloading enabled that during the shutdown of filebeat, a new harvester was started. This is now prevent by having a lock on the starting of the harvester so no new harvesters can be started when shutdown started. This is not backported to 5.x because it can only happen on shutdown and does not have any side affects. Closes #3779 --- CHANGELOG.asciidoc | 1 + filebeat/harvester/registry.go | 69 ++++++++++++++++++++-------------- 2 files changed, 42 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6c9a3c4513a..a9b5be4e992 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -31,6 +31,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d - Fix importing the dashboards when the limit for max open files is too low. {issue}4244[4244] *Filebeat* +- Fix race condition on harvester stopping with reloading enabled. {issue}3779[3779] *Heartbeat* diff --git a/filebeat/harvester/registry.go b/filebeat/harvester/registry.go index 4b840e66364..08c0a20336c 100644 --- a/filebeat/harvester/registry.go +++ b/filebeat/harvester/registry.go @@ -11,56 +11,69 @@ type Registry struct { sync.RWMutex harvesters map[uuid.UUID]Harvester wg sync.WaitGroup + done chan struct{} } // NewRegistry creates a new registry object func NewRegistry() *Registry { return &Registry{ harvesters: map[uuid.UUID]Harvester{}, + done: make(chan struct{}), } } -func (hr *Registry) add(h Harvester) { - hr.Lock() - defer hr.Unlock() - hr.harvesters[h.ID()] = h -} - -func (hr *Registry) remove(h Harvester) { - hr.Lock() - defer hr.Unlock() - delete(hr.harvesters, h.ID()) +func (r *Registry) remove(h Harvester) { + r.Lock() + defer r.Unlock() + delete(r.harvesters, h.ID()) } // Stop stops all harvesters in the registry -func (hr *Registry) Stop() { - hr.Lock() - for _, hv := range hr.harvesters { - hr.wg.Add(1) +func (r *Registry) Stop() { + r.Lock() + defer func() { + r.Unlock() + r.WaitForCompletion() + }() + // Makes sure no new harvesters are added during stopping + close(r.done) + + for _, hv := range r.harvesters { + r.wg.Add(1) go func(h Harvester) { - hr.wg.Done() + r.wg.Done() h.Stop() }(hv) } - hr.Unlock() - hr.WaitForCompletion() + } // WaitForCompletion can be used to wait until all harvesters are stopped -func (hr *Registry) WaitForCompletion() { - hr.wg.Wait() +func (r *Registry) WaitForCompletion() { + r.wg.Wait() } // Start starts the given harvester and add its to the registry -func (hr *Registry) Start(h Harvester) { +func (r *Registry) Start(h Harvester) { + + // Make sure stop is not called during starting a harvester + r.Lock() + defer r.Unlock() + + // Make sure no new harvesters are started after stop was called + select { + case <-r.done: + return + default: + } - hr.wg.Add(1) - hr.add(h) + r.wg.Add(1) + r.harvesters[h.ID()] = h go func() { defer func() { - hr.remove(h) - hr.wg.Done() + r.remove(h) + r.wg.Done() }() // Starts harvester and picks the right type. In case type is not set, set it to default (log) h.Start() @@ -68,8 +81,8 @@ func (hr *Registry) Start(h Harvester) { } // Len returns the current number of harvesters in the registry -func (hr *Registry) Len() uint64 { - hr.RLock() - defer hr.RUnlock() - return uint64(len(hr.harvesters)) +func (r *Registry) Len() uint64 { + r.RLock() + defer r.RUnlock() + return uint64(len(r.harvesters)) }