From 51b4bd2e1430345dc47339dadc136fbd68e38c37 Mon Sep 17 00:00:00 2001 From: Alex K <8418476+fearful-symmetry@users.noreply.github.com> Date: Wed, 7 Dec 2022 13:25:28 -0800 Subject: [PATCH] Add do.Once protection to beater interface (#33971) * add channel protection to beater interface * add changelog (cherry picked from commit 0c77112ccc9c83d14abab5703f6f5418be06be8f) --- CHANGELOG.next.asciidoc | 2 ++ filebeat/beater/filebeat.go | 4 +++- heartbeat/beater/heartbeat.go | 6 ++++-- metricbeat/beater/metricbeat.go | 4 +++- packetbeat/beater/packetbeat.go | 10 ++++++---- 5 files changed, 18 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bc483a83a72..587918d8a2e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -126,6 +126,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add `http.pprof` config options for enabling block and mutex profiling. {issue}33572[33572] {pull}33576[33576] - Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364] - Add `add_formatted_index` processor that allows the resulting index for an event to be changed based on content from the event. {pull}33800[33800] +- deps: Updated to github.com/elastic/go-sysinfo v1.9.0. {pull}33864[33864] +- Fix panic due to close of already closed channel during shutdown {pull}33971[33971] *Auditbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index b2b8027a433..765f0eea2c1 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -21,6 +21,7 @@ import ( "flag" "fmt" "strings" + "sync" "time" "github.com/elastic/beats/v7/filebeat/channel" @@ -67,6 +68,7 @@ type Filebeat struct { moduleRegistry *fileset.ModuleRegistry pluginFactory PluginFactory done chan struct{} + stopOnce sync.Once // wraps the Stop() method pipeline beat.PipelineConnector } @@ -431,7 +433,7 @@ func (fb *Filebeat) Stop() { logp.Info("Stopping filebeat") // Stop Filebeat - close(fb.done) + fb.stopOnce.Do(func() { close(fb.done) }) } // Create a new pipeline loader (es client) factory diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index e91c7acfe3b..7845559a64f 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -20,6 +20,7 @@ package beater import ( "errors" "fmt" + "sync" "syscall" "time" @@ -45,7 +46,8 @@ import ( // Heartbeat represents the root datastructure of this beat. type Heartbeat struct { - done chan struct{} + done chan struct{} + stopOnce sync.Once // config is used for iterating over elements of the config. config config.Config scheduler *scheduler.Scheduler @@ -257,7 +259,7 @@ func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, // Stop stops the beat. func (bt *Heartbeat) Stop() { - close(bt.done) + bt.stopOnce.Do(func() { close(bt.done) }) } func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFrom *config.LocationWithID) error { diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 9e2c101a253..d0f35e2ace4 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -45,6 +45,7 @@ import ( // Metricbeat implements the Beater interface for metricbeat. type Metricbeat struct { done chan struct{} // Channel used to initiate shutdown. + stopOnce sync.Once // wraps the Stop() method runners []module.Runner // Active list of module runners. config Config autodiscover *autodiscover.Autodiscover @@ -272,7 +273,8 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Stop should only be called a single time. Calling it more than once may // result in undefined behavior. func (bt *Metricbeat) Stop() { - close(bt.done) + bt.stopOnce.Do(func() { close(bt.done) }) + } // Modules return a list of all configured modules. diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 9830ae4545e..e6aa51b88e5 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -19,6 +19,7 @@ package beater import ( "flag" + "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -77,9 +78,10 @@ func initialConfig() config.Config { // Beater object. Contains all objects needed to run the beat type packetbeat struct { - config *conf.C - factory *processorFactory - done chan struct{} + config *conf.C + factory *processorFactory + done chan struct{} + stopOnce sync.Once } // New returns a new Packetbeat beat.Beater. @@ -186,5 +188,5 @@ func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error // Called by the Beat stop function func (pb *packetbeat) Stop() { logp.Info("Packetbeat send stop signal") - close(pb.done) + pb.stopOnce.Do(func() { close(pb.done) }) }