From f2c1560b25513b01e3c447db626ecd8b58bbcb43 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 6 Dec 2022 15:51:02 -0800 Subject: [PATCH 1/2] add channel protection to beater interface --- filebeat/beater/filebeat.go | 4 +++- heartbeat/beater/heartbeat.go | 6 ++++-- metricbeat/beater/metricbeat.go | 4 +++- packetbeat/beater/packetbeat.go | 10 ++++++---- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index b2b8027a4337..765f0eea2c17 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -21,6 +21,7 @@ import ( "flag" "fmt" "strings" + "sync" "time" "github.com/elastic/beats/v7/filebeat/channel" @@ -67,6 +68,7 @@ type Filebeat struct { moduleRegistry *fileset.ModuleRegistry pluginFactory PluginFactory done chan struct{} + stopOnce sync.Once // wraps the Stop() method pipeline beat.PipelineConnector } @@ -431,7 +433,7 @@ func (fb *Filebeat) Stop() { logp.Info("Stopping filebeat") // Stop Filebeat - close(fb.done) + fb.stopOnce.Do(func() { close(fb.done) }) } // Create a new pipeline loader (es client) factory diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index e91c7acfe3bb..7845559a64f2 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -20,6 +20,7 @@ package beater import ( "errors" "fmt" + "sync" "syscall" "time" @@ -45,7 +46,8 @@ import ( // Heartbeat represents the root datastructure of this beat. type Heartbeat struct { - done chan struct{} + done chan struct{} + stopOnce sync.Once // config is used for iterating over elements of the config. config config.Config scheduler *scheduler.Scheduler @@ -257,7 +259,7 @@ func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover, // Stop stops the beat. func (bt *Heartbeat) Stop() { - close(bt.done) + bt.stopOnce.Do(func() { close(bt.done) }) } func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFrom *config.LocationWithID) error { diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index 9e2c101a253d..d0f35e2ace49 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -45,6 +45,7 @@ import ( // Metricbeat implements the Beater interface for metricbeat. type Metricbeat struct { done chan struct{} // Channel used to initiate shutdown. + stopOnce sync.Once // wraps the Stop() method runners []module.Runner // Active list of module runners. config Config autodiscover *autodiscover.Autodiscover @@ -272,7 +273,8 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Stop should only be called a single time. Calling it more than once may // result in undefined behavior. func (bt *Metricbeat) Stop() { - close(bt.done) + bt.stopOnce.Do(func() { close(bt.done) }) + } // Modules return a list of all configured modules. diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 9830ae4545e1..e6aa51b88e53 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -19,6 +19,7 @@ package beater import ( "flag" + "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -77,9 +78,10 @@ func initialConfig() config.Config { // Beater object. Contains all objects needed to run the beat type packetbeat struct { - config *conf.C - factory *processorFactory - done chan struct{} + config *conf.C + factory *processorFactory + done chan struct{} + stopOnce sync.Once } // New returns a new Packetbeat beat.Beater. @@ -186,5 +188,5 @@ func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error // Called by the Beat stop function func (pb *packetbeat) Stop() { logp.Info("Packetbeat send stop signal") - close(pb.done) + pb.stopOnce.Do(func() { close(pb.done) }) } From 4fd4fc590b7221997ef9bc08d7beb82399e86e02 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 6 Dec 2022 16:13:37 -0800 Subject: [PATCH 2/2] add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index cef48adba16c..38df61da855e 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364] - Add `add_formatted_index` processor that allows the resulting index for an event to be changed based on content from the event. {pull}33800[33800] - deps: Updated to github.com/elastic/go-sysinfo v1.9.0. {pull}33864[33864] +- Fix panic due to close of already closed channel during shutdown {pull}33971[33971] *Auditbeat*