Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global processors reload #40091

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ replace (
github.com/dgraph-io/ristretto => github.com/elastic/ristretto v0.1.1-0.20220602190459-83b0895ca5b3 // Removes glog dependency. See https://github.com/elastic/beats/issues/31810.
github.com/dop251/goja => github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20
github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6

// this is temporary until the elastic-agent-client changes are merged and released
github.com/elastic/elastic-agent-client/v7 => github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e
github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270
github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c
github.com/godror/godror => github.com/godror/godror v0.33.2 // updating to v0.24.2 caused a breaking change
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,10 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14 h1:s9OJHxfrHxOFBkQrKQAKOE4cYMKMO6s3UxZUxWk+utA=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e h1:l3tZOaQGHhG6s6mmciPzr2ZLVHks4/na7VJj6nUxifc=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
Expand Down
2 changes: 2 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}

reload.RegisterV2.MustRegisterGlobalProcessors(publisher.GlobalProcessorsReloader())

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
Expand Down
19 changes: 19 additions & 0 deletions libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const OutputRegName = "output"
// APMRegName is the registation name for APM tracing.
const APMRegName = "apm"

// GlobalProcessorRegName is the registration name for global processors config
const GlobalProcessorRegName = "global_processors"

// ConfigWithMeta holds a pair of config.C and optional metadata for it
type ConfigWithMeta struct {
// Config to store
Expand Down Expand Up @@ -157,6 +160,14 @@ func (r *Registry) MustRegisterAPM(list Reloadable) {
}
}

// MustRegisterGlobalProcessors is a V2-specific registration function
// that declares a reloadable global processor configuration
func (r *Registry) MustRegisterGlobalProcessors(obj Reloadable) {
if err := r.Register(GlobalProcessorRegName, obj); err != nil {
panic(err)
}
}

// GetInputList is a V2-specific function
// That returns the reloadable list created for an input
func (r *Registry) GetInputList() ReloadableList {
Expand All @@ -181,6 +192,14 @@ func (r *Registry) GetReloadableAPM() Reloadable {
return r.confs[APMRegName]
}

// GetReloadableGlobalProcessors is a V2-specific function
// That returns the reloader for the registered global processor reloadable object
func (r *Registry) GetReloadableGlobalProcessors() Reloadable {
r.RLock()
defer r.RUnlock()
return r.confs[GlobalProcessorRegName]
}

// GetRegisteredNames returns the list of names registered
func (r *Registry) GetRegisteredNames() []string {
r.RLock()
Expand Down
9 changes: 9 additions & 0 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
"github.com/elastic/elastic-agent-libs/logp"
)

type clientUnregisterer interface {
UnregisterClient(*client)
}

// client connects a beat with the processors and pipeline queue.
type client struct {
logger *logp.Logger
Expand All @@ -42,11 +46,13 @@

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once

Check failure on line 49 in libbeat/publisher/pipeline/client.go

View workflow job for this annotation

GitHub Actions / lint (windows)

field `closeOnce` is unused (unused)

observer observer
eventListener beat.EventListener
clientListener beat.ClientListener

unregisterer clientUnregisterer
}

type clientCloseWaiter struct {
Expand Down Expand Up @@ -132,6 +138,9 @@

func (c *client) Close() error {
if c.isOpen.Swap(false) {
// Unregister the client
c.unregisterer.UnregisterClient(c)

// Only do shutdown handling the first time Close is called
c.onClosing()

Expand Down
92 changes: 82 additions & 10 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package pipeline

import (
"fmt"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand Down Expand Up @@ -68,6 +69,10 @@ type Pipeline struct {
waitCloseTimeout time.Duration

processors processing.Supporter

clientTracker *clientTracker

processorReloader *GlobalProcessorReloader
}

// Settings is used to pass additional settings to a newly created pipeline instance.
Expand All @@ -83,6 +88,60 @@ type Settings struct {
InputQueueSize int
}

type clientTracker struct {
log *logp.Logger
// clients holds the pointers to all the connected clients
clients map[*client]beat.ClientConfig
clientsMx sync.Mutex
}

func (ct *clientTracker) RegisterClient(clt *client, cfg beat.ClientConfig) {
ct.log.Debug("Registering new client %x", clt)
ct.clientsMx.Lock()
defer ct.clientsMx.Unlock()
ct.clients[clt] = cfg
ct.log.Debug("Registered new client %x", clt)
}

func (ct *clientTracker) UnregisterClient(clt *client) {
ct.log.Debug("Unregistering client %x", clt)
ct.clientsMx.Lock()
defer ct.clientsMx.Unlock()
delete(ct.clients, clt)
ct.log.Debug("Unregistered client %x", clt)
}

type clientAction func(clt *client, cfg beat.ClientConfig) error

func (ct *clientTracker) ApplyToAllClients(action clientAction) error {
ct.clientsMx.Lock()
defer ct.clientsMx.Unlock()

for clt, cfg := range ct.clients {
if clt == nil {
// should never happen
ct.log.Warn("encountered nil client pointer while iterating over connected pipeline clients. Skipping...")
continue
}
err := action(clt, cfg)
if err != nil {
return fmt.Errorf("error applying action to clients: %w", err)
}
}
return nil
}

type GlobalProcessorReloader struct {
p *Pipeline
}

func (g GlobalProcessorReloader) Reload(config *reload.ConfigWithMeta) error {
log := g.p.monitors.Logger

log.Error("we should reload global processors with %v", config.Config)
return nil
}

// WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline.
type WaitCloseMode uint8

Expand Down Expand Up @@ -110,7 +169,7 @@ type OutputReloader interface {
// The new pipeline will take ownership of queue and outputs. On Close, the
// queue and outputs will be closed.
func New(
beat beat.Info,
beatInfo beat.Info,
monitors Monitors,
userQueueConfig conf.Namespace,
out outputs.Group,
Expand All @@ -121,11 +180,16 @@ func New(
}

p := &Pipeline{
beatInfo: beat,
beatInfo: beatInfo,
monitors: monitors,
observer: nilObserver,
waitCloseTimeout: settings.WaitClose,
processors: settings.Processors,
clientTracker: &clientTracker{
log: monitors.Logger,
clients: make(map[*client]beat.ClientConfig),
clientsMx: sync.Mutex{},
},
}
if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 {
p.waitCloseTimeout = settings.WaitClose
Expand All @@ -147,7 +211,7 @@ func New(
return nil, err
}

output, err := newOutputController(beat, monitors, p.observer, queueFactory, settings.InputQueueSize)
output, err := newOutputController(beatInfo, monitors, p.observer, queueFactory, settings.InputQueueSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -209,14 +273,15 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return nil, err
}

client := &client{
clt := &client{
logger: p.monitors.Logger,
isOpen: atomic.MakeBool(true),
clientListener: cfg.ClientListener,
processors: processors,
eventFlags: eventFlags,
canDrop: canDrop,
observer: p.observer,
unregisterer: p.clientTracker,
}

ackHandler := cfg.EventListener
Expand All @@ -233,7 +298,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {

producerCfg := queue.ProducerConfig{
ACK: func(count int) {
client.observer.eventsACKed(count)
clt.observer.eventsACKed(count)
if ackHandler != nil {
ackHandler.ACKEvents(count)
}
Expand All @@ -244,17 +309,20 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
ackHandler = acker.Nil()
}

client.eventListener = ackHandler
client.waiter = waiter
client.producer = p.outputController.queueProducer(producerCfg)
if client.producer == nil {
clt.eventListener = ackHandler
clt.waiter = waiter
clt.producer = p.outputController.queueProducer(producerCfg)
if clt.producer == nil {
// This can only happen if the pipeline was shut down while clients
// were still waiting to connect.
return nil, fmt.Errorf("client failed to connect because the pipeline is shutting down")
}

p.clientTracker.RegisterClient(clt, cfg)

p.observer.clientConnected()
return client, nil

return clt, nil
}

func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) {
Expand All @@ -269,6 +337,10 @@ func (p *Pipeline) OutputReloader() OutputReloader {
return p.outputController
}

func (p *Pipeline) GlobalProcessorsReloader() reload.Reloadable {
return p.processorReloader
}

// Parses the given config and returns a QueueFactory based on it.
// This helper exists to frontload config parsing errors: if there is an
// error in the queue config, we want it to show up as fatal during
Expand Down
45 changes: 44 additions & 1 deletion x-pack/libbeat/management/managerV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@
client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials())))
} else {
// Normal Elastic-Agent-Client initialisation
agentClient, _, err = client.NewV2FromReader(os.Stdin, versionInfo)
agentClient, _, err = client.NewV2FromReader(os.Stdin, versionInfo, client.WithEmitComponentChanges(true))
if err != nil {
return nil, fmt.Errorf("error reading control config from agent: %w", err)
}
Expand Down Expand Up @@ -293,6 +293,7 @@
"application/yaml",
cm.handleDebugYaml)

go cm.componentListen()
go cm.unitListen()
cm.isRunning = true
return nil
Expand Down Expand Up @@ -472,6 +473,48 @@
}
}

func (cm *BeatV2Manager) componentListen() {
// register signal handler
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)

cm.logger.Info("Listening for agent component changes")
for {
select {
// The stopChan channel comes from the Manager interface Stop() method
case <-cm.stopChan:
cm.stopBeat()
case sig := <-sigc:
// we can't duplicate the same logic used by stopChan here.
// A beat will also watch for sigint and shut down, if we call the stopFunc
// callback, either the V2 client or the beat will get a panic,
// as the stopFunc sent by the beats is usually unsafe.
switch sig {
case syscall.SIGINT, syscall.SIGTERM:
cm.logger.Debug("Received sigterm/sigint, stopping")
case syscall.SIGHUP:
cm.logger.Debug("Received sighup, stopping")
}
cm.isRunning = false
cm.UpdateStatus(status.Stopping, "Stopping")
return
case change := <-cm.client.ComponentChanges():
cm.logger.Infof(
"BeatV2Manager.componentListen ComponentConfigIdx(%d)",
change.ConfigIdx)

processors := cm.registry.GetReloadableGlobalProcessors()
if processors == nil {
cm.logger.Debug("Unable to reload global processors: no global processors reloadable registered")
continue
}

//FIXME we need to map the Global Processor config into a proper ConfigWithMeta object
processors.Reload(change.GlobalProcessors)

Check failure on line 513 in x-pack/libbeat/management/managerV2.go

View workflow job for this annotation

GitHub Actions / lint (windows)

cannot use change.GlobalProcessors (variable of type *client.GlobalProcessorsConfig) as *reload.ConfigWithMeta value in argument to processors.Reload (typecheck)
}
}
}

func (cm *BeatV2Manager) unitListen() {
// register signal handler
sigc := make(chan os.Signal, 1)
Expand Down
Loading