diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 6c9a3c4513af..a9b5be4e9922 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -31,6 +31,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha1...master[Check the HEAD d - Fix importing the dashboards when the limit for max open files is too low. {issue}4244[4244] *Filebeat* +- Fix race condition on harvester stopping with reloading enabled. {issue}3779[3779] *Heartbeat* diff --git a/filebeat/harvester/registry.go b/filebeat/harvester/registry.go index 4b840e663640..08c0a20336cd 100644 --- a/filebeat/harvester/registry.go +++ b/filebeat/harvester/registry.go @@ -11,56 +11,69 @@ type Registry struct { sync.RWMutex harvesters map[uuid.UUID]Harvester wg sync.WaitGroup + done chan struct{} } // NewRegistry creates a new registry object func NewRegistry() *Registry { return &Registry{ harvesters: map[uuid.UUID]Harvester{}, + done: make(chan struct{}), } } -func (hr *Registry) add(h Harvester) { - hr.Lock() - defer hr.Unlock() - hr.harvesters[h.ID()] = h -} - -func (hr *Registry) remove(h Harvester) { - hr.Lock() - defer hr.Unlock() - delete(hr.harvesters, h.ID()) +func (r *Registry) remove(h Harvester) { + r.Lock() + defer r.Unlock() + delete(r.harvesters, h.ID()) } // Stop stops all harvesters in the registry -func (hr *Registry) Stop() { - hr.Lock() - for _, hv := range hr.harvesters { - hr.wg.Add(1) +func (r *Registry) Stop() { + r.Lock() + defer func() { + r.Unlock() + r.WaitForCompletion() + }() + // Makes sure no new harvesters are added during stopping + close(r.done) + + for _, hv := range r.harvesters { + r.wg.Add(1) go func(h Harvester) { - hr.wg.Done() + r.wg.Done() h.Stop() }(hv) } - hr.Unlock() - hr.WaitForCompletion() + } // WaitForCompletion can be used to wait until all harvesters are stopped -func (hr *Registry) WaitForCompletion() { - hr.wg.Wait() +func (r *Registry) WaitForCompletion() { + r.wg.Wait() } // Start starts the given harvester and add its to the registry -func (hr *Registry) Start(h Harvester) { +func (r *Registry) Start(h Harvester) { + + // Make sure stop is not called during starting a harvester + r.Lock() + defer r.Unlock() + + // Make sure no new harvesters are started after stop was called + select { + case <-r.done: + return + default: + } - hr.wg.Add(1) - hr.add(h) + r.wg.Add(1) + r.harvesters[h.ID()] = h go func() { defer func() { - hr.remove(h) - hr.wg.Done() + r.remove(h) + r.wg.Done() }() // Starts harvester and picks the right type. In case type is not set, set it to default (log) h.Start() @@ -68,8 +81,8 @@ func (hr *Registry) Start(h Harvester) { } // Len returns the current number of harvesters in the registry -func (hr *Registry) Len() uint64 { - hr.RLock() - defer hr.RUnlock() - return uint64(len(hr.harvesters)) +func (r *Registry) Len() uint64 { + r.RLock() + defer r.RUnlock() + return uint64(len(r.harvesters)) }