Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add do.Once protection to beater interface #33971

Merged
merged 4 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Added append Processor which will append concrete values or values from a field to target. {issue}29934[29934] {pull}33364[33364]
- Add `add_formatted_index` processor that allows the resulting index for an event to be changed based on content from the event. {pull}33800[33800]
- deps: Updated to github.com/elastic/go-sysinfo v1.9.0. {pull}33864[33864]
- Fix panic due to close of already closed channel during shutdown {pull}33971[33971]

*Auditbeat*

Expand Down
4 changes: 3 additions & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"flag"
"fmt"
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/filebeat/channel"
Expand Down Expand Up @@ -67,6 +68,7 @@ type Filebeat struct {
moduleRegistry *fileset.ModuleRegistry
pluginFactory PluginFactory
done chan struct{}
stopOnce sync.Once // wraps the Stop() method
pipeline beat.PipelineConnector
}

Expand Down Expand Up @@ -431,7 +433,7 @@ func (fb *Filebeat) Stop() {
logp.Info("Stopping filebeat")

// Stop Filebeat
close(fb.done)
fb.stopOnce.Do(func() { close(fb.done) })
}

// Create a new pipeline loader (es client) factory
Expand Down
6 changes: 4 additions & 2 deletions heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package beater
import (
"errors"
"fmt"
"sync"

"syscall"
"time"
Expand All @@ -45,7 +46,8 @@ import (

// Heartbeat represents the root datastructure of this beat.
type Heartbeat struct {
done chan struct{}
done chan struct{}
stopOnce sync.Once
// config is used for iterating over elements of the config.
config config.Config
scheduler *scheduler.Scheduler
Expand Down Expand Up @@ -257,7 +259,7 @@ func (bt *Heartbeat) makeAutodiscover(b *beat.Beat) (*autodiscover.Autodiscover,

// Stop stops the beat.
func (bt *Heartbeat) Stop() {
close(bt.done)
bt.stopOnce.Do(func() { close(bt.done) })
}

func makeStatesClient(cfg *conf.C, replace func(monitorstate.StateLoader), runFrom *config.LocationWithID) error {
Expand Down
4 changes: 3 additions & 1 deletion metricbeat/beater/metricbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
// Metricbeat implements the Beater interface for metricbeat.
type Metricbeat struct {
done chan struct{} // Channel used to initiate shutdown.
stopOnce sync.Once // wraps the Stop() method
runners []module.Runner // Active list of module runners.
config Config
autodiscover *autodiscover.Autodiscover
Expand Down Expand Up @@ -272,7 +273,8 @@ func (bt *Metricbeat) Run(b *beat.Beat) error {
// Stop should only be called a single time. Calling it more than once may
// result in undefined behavior.
func (bt *Metricbeat) Stop() {
close(bt.done)
bt.stopOnce.Do(func() { close(bt.done) })

}

// Modules return a list of all configured modules.
Expand Down
10 changes: 6 additions & 4 deletions packetbeat/beater/packetbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package beater

import (
"flag"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -77,9 +78,10 @@ func initialConfig() config.Config {

// Beater object. Contains all objects needed to run the beat
type packetbeat struct {
config *conf.C
factory *processorFactory
done chan struct{}
config *conf.C
factory *processorFactory
done chan struct{}
stopOnce sync.Once
}

// New returns a new Packetbeat beat.Beater.
Expand Down Expand Up @@ -186,5 +188,5 @@ func (pb *packetbeat) runManaged(b *beat.Beat, factory *processorFactory) error
// Called by the Beat stop function
func (pb *packetbeat) Stop() {
logp.Info("Packetbeat send stop signal")
close(pb.done)
pb.stopOnce.Do(func() { close(pb.done) })
}