From 5686c62518a3bdaa467cb51ce305cf661f81dd73 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Wed, 3 Jul 2024 16:29:39 +0200 Subject: [PATCH 1/9] Point to custom elastic-agent-client lib version --- go.mod | 3 +++ go.sum | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/go.mod b/go.mod index 6bf9204f493..d13676caf70 100644 --- a/go.mod +++ b/go.mod @@ -406,6 +406,9 @@ replace ( github.com/dgraph-io/ristretto => github.com/elastic/ristretto v0.1.1-0.20220602190459-83b0895ca5b3 // Removes glog dependency. See https://github.com/elastic/beats/issues/31810. github.com/dop251/goja => github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 + + // this is temporary until the elastic-agent-client changes are merged and released + github.com/elastic/elastic-agent-client/v7 => github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c github.com/godror/godror => github.com/godror/godror v0.33.2 // updating to v0.24.2 caused a breaking change diff --git a/go.sum b/go.sum index 12ad7d6a691..961c694ea6d 100644 --- a/go.sum +++ b/go.sum @@ -1410,6 +1410,10 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14 h1:s9OJHxfrHxOFBkQrKQAKOE4cYMKMO6s3UxZUxWk+utA= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e h1:l3tZOaQGHhG6s6mmciPzr2ZLVHks4/na7VJj6nUxifc= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= From d2ad47deb605f86f5081ef91e67abe5eacd1effd Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Wed, 3 Jul 2024 18:57:44 +0200 Subject: [PATCH 2/9] Add for component changes in BeatV2Manager --- x-pack/libbeat/management/managerV2.go | 38 +++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index e39b394bf2b..f2508189b87 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -183,7 +183,7 @@ func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement. client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) } else { // Normal Elastic-Agent-Client initialisation - agentClient, _, err = client.NewV2FromReader(os.Stdin, versionInfo) + agentClient, _, err = client.NewV2FromReader(os.Stdin, versionInfo, client.WithEmitComponentChanges(true)) if err != nil { return nil, fmt.Errorf("error reading control config from agent: %w", err) } @@ -293,6 +293,7 @@ func (cm *BeatV2Manager) Start() error { "application/yaml", cm.handleDebugYaml) + go cm.componentListen() go cm.unitListen() cm.isRunning = true return nil @@ -472,6 +473,41 @@ func (cm *BeatV2Manager) watchErrChan(ctx context.Context) { } } +func (cm *BeatV2Manager) componentListen() { + // register signal handler + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) + + cm.logger.Info("Listening for agent component changes") + for { + select { + // The stopChan channel comes from the Manager interface Stop() method + case <-cm.stopChan: + cm.stopBeat() + case sig := <-sigc: + // we can't duplicate the same logic used by stopChan here. + // A beat will also watch for sigint and shut down, if we call the stopFunc + // callback, either the V2 client or the beat will get a panic, + // as the stopFunc sent by the beats is usually unsafe. + switch sig { + case syscall.SIGINT, syscall.SIGTERM: + cm.logger.Debug("Received sigterm/sigint, stopping") + case syscall.SIGHUP: + cm.logger.Debug("Received sighup, stopping") + } + cm.isRunning = false + cm.UpdateStatus(status.Stopping, "Stopping") + return + case change := <-cm.client.ComponentChanges(): + cm.logger.Infof( + "BeatV2Manager.componentListen ComponentConfigIdx(%d)", + change.ConfigIdx) + + // TODO handle GlobalProcessorConfig + } + } +} + func (cm *BeatV2Manager) unitListen() { // register signal handler sigc := make(chan os.Signal, 1) From 5aedc29686e7c8aa3d71f2c70a5249bc7deee164 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 16 Jul 2024 15:14:30 +0200 Subject: [PATCH 3/9] Add Global processor reload registry --- libbeat/common/reload/reload.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index d029f473f17..b9bec150406 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -38,6 +38,9 @@ const OutputRegName = "output" // APMRegName is the registation name for APM tracing. const APMRegName = "apm" +// GlobalProcessorRegName is the registration name for global processors config +const GlobalProcessorRegName = "global_processors" + // ConfigWithMeta holds a pair of config.C and optional metadata for it type ConfigWithMeta struct { // Config to store @@ -157,6 +160,14 @@ func (r *Registry) MustRegisterAPM(list Reloadable) { } } +// MustRegisterGlobalProcessors is a V2-specific registration function +// that declares a reloadable global processor configuration +func (r *Registry) MustRegisterGlobalProcessors(obj Reloadable) { + if err := r.Register(GlobalProcessorRegName, obj); err != nil { + panic(err) + } +} + // GetInputList is a V2-specific function // That returns the reloadable list created for an input func (r *Registry) GetInputList() ReloadableList { @@ -181,6 +192,14 @@ func (r *Registry) GetReloadableAPM() Reloadable { return r.confs[APMRegName] } +// GetReloadableGlobalProcessors is a V2-specific function +// That returns the reloader for the registered global processor reloadable object +func (r *Registry) GetReloadableGlobalProcessors() Reloadable { + r.RLock() + defer r.RUnlock() + return r.confs[GlobalProcessorRegName] +} + // GetRegisteredNames returns the list of names registered func (r *Registry) GetRegisteredNames() []string { r.RLock() From a5efdca2fb1afb4f399e6d932f153feebc0e565d Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 16 Jul 2024 16:58:49 +0200 Subject: [PATCH 4/9] Add Pipeline client tracking --- libbeat/publisher/pipeline/client.go | 9 ++++ libbeat/publisher/pipeline/pipeline.go | 75 ++++++++++++++++++++++---- 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index af756213a63..d1fe332c31e 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -29,6 +29,10 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) +type clientUnregisterer interface { + UnregisterClient(*client) +} + // client connects a beat with the processors and pipeline queue. type client struct { logger *logp.Logger @@ -47,6 +51,8 @@ type client struct { observer observer eventListener beat.EventListener clientListener beat.ClientListener + + unregisterer clientUnregisterer } type clientCloseWaiter struct { @@ -132,6 +138,9 @@ func (c *client) publish(e beat.Event) { func (c *client) Close() error { if c.isOpen.Swap(false) { + // Unregister the client + c.unregisterer.UnregisterClient(c) + // Only do shutdown handling the first time Close is called c.onClosing() diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index a5a13a0584e..c570b6a5ad7 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -22,6 +22,7 @@ package pipeline import ( "fmt" + "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -68,6 +69,8 @@ type Pipeline struct { waitCloseTimeout time.Duration processors processing.Supporter + + clientTracker *clientTracker } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -83,6 +86,49 @@ type Settings struct { InputQueueSize int } +type clientTracker struct { + log *logp.Logger + // clients holds the pointers to all the connected clients + clients map[*client]beat.ClientConfig + clientsMx sync.Mutex +} + +func (ct *clientTracker) RegisterClient(clt *client, cfg beat.ClientConfig) { + ct.log.Debug("Registering new client %x", clt) + ct.clientsMx.Lock() + defer ct.clientsMx.Unlock() + ct.clients[clt] = cfg + ct.log.Debug("Registered new client %x", clt) +} + +func (ct *clientTracker) UnregisterClient(clt *client) { + ct.log.Debug("Unregistering client %x", clt) + ct.clientsMx.Lock() + defer ct.clientsMx.Unlock() + delete(ct.clients, clt) + ct.log.Debug("Unregistered client %x", clt) +} + +type clientAction func(clt *client, cfg beat.ClientConfig) error + +func (ct *clientTracker) ApplyToAllClients(action clientAction) error { + ct.clientsMx.Lock() + defer ct.clientsMx.Unlock() + + for clt, cfg := range ct.clients { + if clt == nil { + // should never happen + ct.log.Warn("encountered nil client pointer while iterating over connected pipeline clients. Skipping...") + continue + } + err := action(clt, cfg) + if err != nil { + return fmt.Errorf("error applying action to clients: %w", err) + } + } + return nil +} + // WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline. type WaitCloseMode uint8 @@ -110,7 +156,7 @@ type OutputReloader interface { // The new pipeline will take ownership of queue and outputs. On Close, the // queue and outputs will be closed. func New( - beat beat.Info, + beatInfo beat.Info, monitors Monitors, userQueueConfig conf.Namespace, out outputs.Group, @@ -121,11 +167,16 @@ func New( } p := &Pipeline{ - beatInfo: beat, + beatInfo: beatInfo, monitors: monitors, observer: nilObserver, waitCloseTimeout: settings.WaitClose, processors: settings.Processors, + clientTracker: &clientTracker{ + log: monitors.Logger, + clients: make(map[*client]beat.ClientConfig), + clientsMx: sync.Mutex{}, + }, } if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { p.waitCloseTimeout = settings.WaitClose @@ -147,7 +198,7 @@ func New( return nil, err } - output, err := newOutputController(beat, monitors, p.observer, queueFactory, settings.InputQueueSize) + output, err := newOutputController(beatInfo, monitors, p.observer, queueFactory, settings.InputQueueSize) if err != nil { return nil, err } @@ -209,7 +260,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return nil, err } - client := &client{ + clt := &client{ logger: p.monitors.Logger, isOpen: atomic.MakeBool(true), clientListener: cfg.ClientListener, @@ -217,6 +268,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { eventFlags: eventFlags, canDrop: canDrop, observer: p.observer, + unregisterer: p.clientTracker, } ackHandler := cfg.EventListener @@ -233,7 +285,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { producerCfg := queue.ProducerConfig{ ACK: func(count int) { - client.observer.eventsACKed(count) + clt.observer.eventsACKed(count) if ackHandler != nil { ackHandler.ACKEvents(count) } @@ -244,17 +296,20 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { ackHandler = acker.Nil() } - client.eventListener = ackHandler - client.waiter = waiter - client.producer = p.outputController.queueProducer(producerCfg) - if client.producer == nil { + clt.eventListener = ackHandler + clt.waiter = waiter + clt.producer = p.outputController.queueProducer(producerCfg) + if clt.producer == nil { // This can only happen if the pipeline was shut down while clients // were still waiting to connect. return nil, fmt.Errorf("client failed to connect because the pipeline is shutting down") } + p.clientTracker.RegisterClient(clt, cfg) + p.observer.clientConnected() - return client, nil + + return clt, nil } func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) { From bba1175bf237c6e24555d2eb31c0c1c85f3a869a Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 16 Jul 2024 16:59:21 +0200 Subject: [PATCH 5/9] WIP - add global processor reloader to Pipeline --- libbeat/cmd/instance/beat.go | 2 ++ libbeat/publisher/pipeline/pipeline.go | 17 +++++++++++++++++ x-pack/libbeat/management/managerV2.go | 9 ++++++++- 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index c15d9b8c200..2a4844d68c9 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -394,6 +394,8 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { return nil, fmt.Errorf("error initializing publisher: %w", err) } + reload.RegisterV2.MustRegisterGlobalProcessors(publisher.GlobalProcessorsReloader()) + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index c570b6a5ad7..726fe31fe79 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -71,6 +71,8 @@ type Pipeline struct { processors processing.Supporter clientTracker *clientTracker + + processorReloader *GlobalProcessorReloader } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -129,6 +131,17 @@ func (ct *clientTracker) ApplyToAllClients(action clientAction) error { return nil } +type GlobalProcessorReloader struct { + p *Pipeline +} + +func (g GlobalProcessorReloader) Reload(config *reload.ConfigWithMeta) error { + log := g.p.monitors.Logger + + log.Error("we should reload global processors with %v", config.Config) + return nil +} + // WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline. type WaitCloseMode uint8 @@ -324,6 +337,10 @@ func (p *Pipeline) OutputReloader() OutputReloader { return p.outputController } +func (p *Pipeline) GlobalProcessorsReloader() reload.Reloadable { + return p.processorReloader +} + // Parses the given config and returns a QueueFactory based on it. // This helper exists to frontload config parsing errors: if there is an // error in the queue config, we want it to show up as fatal during diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index f2508189b87..654f47b5e95 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -503,7 +503,14 @@ func (cm *BeatV2Manager) componentListen() { "BeatV2Manager.componentListen ComponentConfigIdx(%d)", change.ConfigIdx) - // TODO handle GlobalProcessorConfig + processors := cm.registry.GetReloadableGlobalProcessors() + if processors == nil { + cm.logger.Debug("Unable to reload global processors: no global processors reloadable registered") + continue + } + + //FIXME we need to map the Global Processor config into a proper ConfigWithMeta object + processors.Reload(change.GlobalProcessors) } } } From d54d01abc8dad1da9ace831dd6f532ab542ded63 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 22 Jul 2024 10:51:18 +0200 Subject: [PATCH 6/9] fixup! Point to custom elastic-agent-client lib version --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index d13676caf70..e3ed1c72975 100644 --- a/go.mod +++ b/go.mod @@ -408,7 +408,7 @@ replace ( github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 // this is temporary until the elastic-agent-client changes are merged and released - github.com/elastic/elastic-agent-client/v7 => github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e + github.com/elastic/elastic-agent-client/v7 => github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c github.com/godror/godror => github.com/godror/godror v0.33.2 // updating to v0.24.2 caused a breaking change diff --git a/go.sum b/go.sum index 961c694ea6d..3270189526e 100644 --- a/go.sum +++ b/go.sum @@ -1414,6 +1414,8 @@ github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14 h1: github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e h1:l3tZOaQGHhG6s6mmciPzr2ZLVHks4/na7VJj6nUxifc= github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e h1:c+moxj4HJUXmcVYb/YXk0ei48SM4v8Hc8QeHd5A5/Eo= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= From 3eca9f71330c938d74e70854f43ce91243c66ffa Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Mon, 22 Jul 2024 11:23:51 +0200 Subject: [PATCH 7/9] WIP - reload global processor config - SIGSEGV --- libbeat/publisher/pipeline/pipeline.go | 7 ++-- x-pack/libbeat/management/managerV2.go | 54 ++++++++++++++++++++------ 2 files changed, 46 insertions(+), 15 deletions(-) diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 726fe31fe79..733c17f301f 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -22,6 +22,7 @@ package pipeline import ( "fmt" + "os" "sync" "time" @@ -135,10 +136,8 @@ type GlobalProcessorReloader struct { p *Pipeline } -func (g GlobalProcessorReloader) Reload(config *reload.ConfigWithMeta) error { - log := g.p.monitors.Logger - - log.Error("we should reload global processors with %v", config.Config) +func (g *GlobalProcessorReloader) Reload(config *reload.ConfigWithMeta) error { + fmt.Fprintf(os.Stderr, "GlobalProcessorReloader %v should reload global processors with %v", g, config.Config) return nil } diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 654f47b5e95..cf45c322f92 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -105,6 +105,9 @@ type BeatV2Manager struct { // set with the last applied APM config lastAPMCfg *proto.APMConfig + // set with the last applied global processors config + lastGlobalProcessorsConfig *proto.GlobalProcessorsConfig + // used for the debug callback to report as-running config lastBeatOutputCfg *reload.ConfigWithMeta lastBeatInputCfgs []*reload.ConfigWithMeta @@ -499,18 +502,11 @@ func (cm *BeatV2Manager) componentListen() { cm.UpdateStatus(status.Stopping, "Stopping") return case change := <-cm.client.ComponentChanges(): - cm.logger.Infof( - "BeatV2Manager.componentListen ComponentConfigIdx(%d)", - change.ConfigIdx) - - processors := cm.registry.GetReloadableGlobalProcessors() - if processors == nil { - cm.logger.Debug("Unable to reload global processors: no global processors reloadable registered") - continue + cm.logger.Debugw("received component change event", "event", change) + err := cm.reloadGlobalProcessors(change) + if err != nil { + cm.logger.Errorw("Error reloading global processors", "error", err) } - - //FIXME we need to map the Global Processor config into a proper ConfigWithMeta object - processors.Reload(change.GlobalProcessors) } } } @@ -1039,6 +1035,42 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte { return data } +func (cm *BeatV2Manager) reloadGlobalProcessors(change client.Component) error { + cm.logger.Debug("Reloading global processors config") + processors := cm.registry.GetReloadableGlobalProcessors() + if processors == nil { + return fmt.Errorf("reloading global processors: no global processors reloadable registered") + } + + if change.Config == nil { + cm.logger.Debug("Component changes contain a nil config, skipping global processors reload") + return nil + } + + if gproto.Equal(cm.lastGlobalProcessorsConfig, change.Config.Processors) { + cm.logger.Debug("Global processor config is the same as the last applied, skipping reload") + return nil + } + + var newProcessorConfig *conf.C + if change.Config.Processors != nil { + newConf, err := conf.NewConfigFrom(change.Config.Processors) + if err != nil { + return fmt.Errorf("creating new global processor config: %w", err) + } + newProcessorConfig = newConf + } + + err := processors.Reload(&reload.ConfigWithMeta{Config: newProcessorConfig}) + if err != nil { + return fmt.Errorf("reloading global processor config: %w", err) + } + + cm.lastGlobalProcessorsConfig = change.Config.Processors + cm.logger.Debug("Global processors config reloaded") + return nil +} + func getZapcoreLevel(ll client.UnitLogLevel) (zapcore.Level, bool) { switch ll { case client.UnitLogLevelError: From c0de1065c040c04b2608a94557c6c351209b0ca4 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 23 Jul 2024 14:27:09 +0200 Subject: [PATCH 8/9] WIP - Load Pipeline object lazily Delay instantiation of Pipeline object till the first client (input) tries to connect to it. The idea is to use a builder object for the Pipeline on which we can set parameters (in this case the global processor config) until the Pipeline is instantiated. Once the Pipeline is instantiated, the global processor reloader will return a specific error, signaling that to reload configuration we need a new Pipeline (i.e. a beat restart). At this point in the code the reload and restart seem to work, however it seems that the output unit is not set correctly (we probably need some additional hook to correctly wire the Pipeline and the Output object --- libbeat/beat/pipeline.go | 6 + libbeat/cmd/instance/beat.go | 40 +++++-- libbeat/publisher/pipeline/builder.go | 158 +++++++++++++++++++++++++ libbeat/publisher/pipeline/pipeline.go | 16 --- x-pack/libbeat/management/managerV2.go | 51 +++----- 5 files changed, 209 insertions(+), 62 deletions(-) create mode 100644 libbeat/publisher/pipeline/builder.go diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 0917001a86c..f0d5d693582 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -30,6 +30,12 @@ type Pipeline interface { Connect() (Client, error) } +// PipelineBuilder provide a way to build a Pipeline lazily (as late as possible) +type PipelineBuilder interface { + Pipeline + Build() (Pipeline, error) +} + // PipelineConnector wraps the Pipeline interface type PipelineConnector = Pipeline diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 2a4844d68c9..dc16bbb7420 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -97,8 +97,9 @@ type Beat struct { RawConfig *config.C // Raw config that can be unpacked to get Beat specific config data. IdxSupporter idxmgmt.Supporter - keystore keystore.Keystore - processors processing.Supporter + keystore keystore.Keystore + processors processing.Supporter + processingFactory processing.SupportFactory InputQueueSize int // Size of the producer queue used by most queues. @@ -377,7 +378,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { } } - var publisher *pipeline.Pipeline monitors := pipeline.Monitors{ Metrics: reg, Telemetry: monitoring.GetNamespace("state").GetRegistry(), @@ -389,20 +389,39 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { Processors: b.processors, InputQueueSize: b.InputQueueSize, } - publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) - if err != nil { - return nil, fmt.Errorf("error initializing publisher: %w", err) - } - reload.RegisterV2.MustRegisterGlobalProcessors(publisher.GlobalProcessorsReloader()) + if b.Manager.Enabled() { + if !outputEnabled { + // Create a pipeline builder and don't set the pipeline until we are ready to set the output + pipelineBuilder := pipeline.NewPipelineBuilder(b.Info, monitors, b.Config.Pipeline, outputFactory, settings, b.processingFactory) + b.Publisher = pipelineBuilder + reload.RegisterV2.MustRegisterGlobalProcessors(pipelineBuilder.GlobalProcessorsReloader()) + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(pipelineBuilder.OutputReloader())) + } else { + // don't set the builder, the output is already there. + // TODO Log a warning saying that reload won't work properly - reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) + publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) + if err != nil { + return nil, fmt.Errorf("error initializing publisher: %w", err) + } + b.Publisher = publisher + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) + } + } else { + // normal running without agent + publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) + if err != nil { + return nil, fmt.Errorf("error initializing publisher: %w", err) + } + b.Publisher = publisher + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) + } // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, // but refine publisher to disconnect clients on stop automatically // defer pipeline.Close() - b.Publisher = publisher beater, err := bt(&b.Beat, sub) if err != nil { return nil, err @@ -900,6 +919,7 @@ func (b *Beat) configure(settings Settings) error { if processingFactory == nil { processingFactory = processing.MakeDefaultBeatSupport(true) } + b.processingFactory = processingFactory b.processors, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors", diff --git a/libbeat/publisher/pipeline/builder.go b/libbeat/publisher/pipeline/builder.go new file mode 100644 index 00000000000..28a206b5e63 --- /dev/null +++ b/libbeat/publisher/pipeline/builder.go @@ -0,0 +1,158 @@ +package pipeline + +import ( + "errors" + "fmt" + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher/processing" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +var ErrNoReloadPipelineAlreadyBuilt = errors.New("cannot reload as Pipeline has been already built") + +// Builder is an implementation of Builder pattern for building a Pipeline +type Builder struct { + // Pipeline parameters + beatInfo beat.Info + monitors Monitors + config Config + makeOutput outputFactory + settings Settings + + // Attributes for lazy loading of the pipeline + buildmx sync.Mutex + pipelineBuilt bool + pipeline *Pipeline + pipelineErr error + + // Reloaders + outputReloader *outputReloader + globalProcessorReloader *globalProcessorReloader + + processingFactory processing.SupportFactory +} + +func NewPipelineBuilder(beatInfo beat.Info, monitors Monitors, config Config, outFactory outputFactory, settings Settings, processorsFactory processing.SupportFactory) *Builder { + b := &Builder{ + beatInfo: beatInfo, + monitors: monitors, + config: config, + makeOutput: outFactory, + settings: settings, + processingFactory: processorsFactory, + } + + b.outputReloader = &outputReloader{b} + b.globalProcessorReloader = &globalProcessorReloader{b: b} + + return b +} + +func (b *Builder) WithGlobalProcessors(processors processing.Supporter) error { + b.settings.Processors = processors + return nil +} + +// build will materialize the Pipeline only once, after Pipeline has been built this is a no-op +func (b *Builder) build() { + b.buildmx.Lock() + defer b.buildmx.Unlock() + log := b.monitors.Logger + + if b.pipelineBuilt { + log.Debug("Pipeline already built, skipping..") + return + } + + log.Info("Creating Pipeline...") + p, err := LoadWithSettings(b.beatInfo, b.monitors, b.config, b.makeOutput, b.settings) + b.pipelineBuilt = true + + if err != nil { + log.Errorf("Error creating pipeline: %s", err) + b.pipelineErr = fmt.Errorf("instantiating Pipeline: %w", err) + return + } + b.pipeline = p + log.Info("Pipeline created successfully") +} + +func (b *Builder) ConnectWith(config beat.ClientConfig) (beat.Client, error) { + b.build() + if b.pipelineErr != nil { + return nil, b.pipelineErr + } + + return b.pipeline.ConnectWith(config) +} + +func (b *Builder) Connect() (beat.Client, error) { + b.build() + if b.pipelineErr != nil { + return nil, b.pipelineErr + } + + return b.pipeline.Connect() +} + +func (b *Builder) OutputReloader() OutputReloader { + return b.outputReloader +} + +func (b *Builder) GlobalProcessorsReloader() *globalProcessorReloader { + return b.globalProcessorReloader +} + +type globalProcessorReloader struct { + b *Builder +} + +func (gpr *globalProcessorReloader) Reload(config *reload.ConfigWithMeta) error { + builder := gpr.b + builder.buildmx.Lock() + defer builder.buildmx.Unlock() + + builder.monitors.Logger.Debugf("Reloading global processor with %s", config.Config) + + if builder.pipelineBuilt { + // Too late as the pipeline is built already. We need to restart + builder.monitors.Logger.Debug("Pipeline already instantiated. Returning ErrNoReloadPipelineAlreadyBuilt") + return ErrNoReloadPipelineAlreadyBuilt + } + + newProcessors, err := gpr.createProcessors(config.Config) + if err != nil { + return fmt.Errorf("creating new processors with config %s : %w", config.Config, err) + } + builder.WithGlobalProcessors(newProcessors) + builder.monitors.Logger.Debugf("Reloading global processor complete", config.Config) + return nil +} + +func (gpr *globalProcessorReloader) createProcessors(rawProcessorConfig *conf.C) (processing.Supporter, error) { + processingFactory := gpr.b.processingFactory + if processingFactory == nil { + processingFactory = processing.MakeDefaultBeatSupport(true) + } + return processingFactory(gpr.b.beatInfo, logp.L().Named("processors"), rawProcessorConfig) +} + +type outputReloader struct { + b *Builder +} + +func (or *outputReloader) Reload( + cfg *reload.ConfigWithMeta, + factory func(outputs.Observer, conf.Namespace) (outputs.Group, error), +) error { + or.b.build() // create the pipeline if needed + if or.b.pipelineErr != nil { + return or.b.pipelineErr + } + return or.b.pipeline.OutputReloader().Reload(cfg, factory) +} diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 733c17f301f..c570b6a5ad7 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -22,7 +22,6 @@ package pipeline import ( "fmt" - "os" "sync" "time" @@ -72,8 +71,6 @@ type Pipeline struct { processors processing.Supporter clientTracker *clientTracker - - processorReloader *GlobalProcessorReloader } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -132,15 +129,6 @@ func (ct *clientTracker) ApplyToAllClients(action clientAction) error { return nil } -type GlobalProcessorReloader struct { - p *Pipeline -} - -func (g *GlobalProcessorReloader) Reload(config *reload.ConfigWithMeta) error { - fmt.Fprintf(os.Stderr, "GlobalProcessorReloader %v should reload global processors with %v", g, config.Config) - return nil -} - // WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline. type WaitCloseMode uint8 @@ -336,10 +324,6 @@ func (p *Pipeline) OutputReloader() OutputReloader { return p.outputController } -func (p *Pipeline) GlobalProcessorsReloader() reload.Reloadable { - return p.processorReloader -} - // Parses the given config and returns a QueueFactory based on it. // This helper exists to frontload config parsing errors: if there is an // error in the queue config, we want it to show up as fatal during diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index cf45c322f92..e8440809af8 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/joeshaw/multierror" "go.uber.org/zap/zapcore" "google.golang.org/grpc" @@ -296,7 +297,6 @@ func (cm *BeatV2Manager) Start() error { "application/yaml", cm.handleDebugYaml) - go cm.componentListen() go cm.unitListen() cm.isRunning = true return nil @@ -476,41 +476,6 @@ func (cm *BeatV2Manager) watchErrChan(ctx context.Context) { } } -func (cm *BeatV2Manager) componentListen() { - // register signal handler - sigc := make(chan os.Signal, 1) - signal.Notify(sigc, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) - - cm.logger.Info("Listening for agent component changes") - for { - select { - // The stopChan channel comes from the Manager interface Stop() method - case <-cm.stopChan: - cm.stopBeat() - case sig := <-sigc: - // we can't duplicate the same logic used by stopChan here. - // A beat will also watch for sigint and shut down, if we call the stopFunc - // callback, either the V2 client or the beat will get a panic, - // as the stopFunc sent by the beats is usually unsafe. - switch sig { - case syscall.SIGINT, syscall.SIGTERM: - cm.logger.Debug("Received sigterm/sigint, stopping") - case syscall.SIGHUP: - cm.logger.Debug("Received sighup, stopping") - } - cm.isRunning = false - cm.UpdateStatus(status.Stopping, "Stopping") - return - case change := <-cm.client.ComponentChanges(): - cm.logger.Debugw("received component change event", "event", change) - err := cm.reloadGlobalProcessors(change) - if err != nil { - cm.logger.Errorw("Error reloading global processors", "error", err) - } - } - } -} - func (cm *BeatV2Manager) unitListen() { // register signal handler sigc := make(chan os.Signal, 1) @@ -541,6 +506,12 @@ func (cm *BeatV2Manager) unitListen() { cm.isRunning = false cm.UpdateStatus(status.Stopping, "Stopping") return + case change := <-cm.client.ComponentChanges(): + cm.logger.Debugw("received component change event", "event", change) + err := cm.reloadGlobalProcessors(change) + if err != nil { + cm.logger.Errorw("Error reloading global processors", "error", err) + } case change := <-cm.client.UnitChanges(): cm.logger.Infof( "BeatV2Manager.unitListen UnitChanged.ID(%s), UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s", @@ -1062,6 +1033,14 @@ func (cm *BeatV2Manager) reloadGlobalProcessors(change client.Component) error { } err := processors.Reload(&reload.ConfigWithMeta{Config: newProcessorConfig}) + + if errors.Is(err, pipeline.ErrNoReloadPipelineAlreadyBuilt) { + // Pipeline is already instantiated, need to restart + cm.logger.Info("beat is restarting because global processor configuration changed") + cm.Stop() + return nil + } + if err != nil { return fmt.Errorf("reloading global processor config: %w", err) } From d7b891e60dd0671e4fcf2649721ede800050fcb1 Mon Sep 17 00:00:00 2001 From: Paolo Chila Date: Tue, 23 Jul 2024 14:46:33 +0200 Subject: [PATCH 9/9] cleanup --- libbeat/beat/pipeline.go | 6 --- libbeat/publisher/pipeline/client.go | 5 --- libbeat/publisher/pipeline/pipeline.go | 54 -------------------------- 3 files changed, 65 deletions(-) diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index f0d5d693582..0917001a86c 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -30,12 +30,6 @@ type Pipeline interface { Connect() (Client, error) } -// PipelineBuilder provide a way to build a Pipeline lazily (as late as possible) -type PipelineBuilder interface { - Pipeline - Build() (Pipeline, error) -} - // PipelineConnector wraps the Pipeline interface type PipelineConnector = Pipeline diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index d1fe332c31e..3d1d613fdd5 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -51,8 +51,6 @@ type client struct { observer observer eventListener beat.EventListener clientListener beat.ClientListener - - unregisterer clientUnregisterer } type clientCloseWaiter struct { @@ -138,9 +136,6 @@ func (c *client) publish(e beat.Event) { func (c *client) Close() error { if c.isOpen.Swap(false) { - // Unregister the client - c.unregisterer.UnregisterClient(c) - // Only do shutdown handling the first time Close is called c.onClosing() diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index c570b6a5ad7..ed38415186f 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -22,7 +22,6 @@ package pipeline import ( "fmt" - "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -69,8 +68,6 @@ type Pipeline struct { waitCloseTimeout time.Duration processors processing.Supporter - - clientTracker *clientTracker } // Settings is used to pass additional settings to a newly created pipeline instance. @@ -86,49 +83,6 @@ type Settings struct { InputQueueSize int } -type clientTracker struct { - log *logp.Logger - // clients holds the pointers to all the connected clients - clients map[*client]beat.ClientConfig - clientsMx sync.Mutex -} - -func (ct *clientTracker) RegisterClient(clt *client, cfg beat.ClientConfig) { - ct.log.Debug("Registering new client %x", clt) - ct.clientsMx.Lock() - defer ct.clientsMx.Unlock() - ct.clients[clt] = cfg - ct.log.Debug("Registered new client %x", clt) -} - -func (ct *clientTracker) UnregisterClient(clt *client) { - ct.log.Debug("Unregistering client %x", clt) - ct.clientsMx.Lock() - defer ct.clientsMx.Unlock() - delete(ct.clients, clt) - ct.log.Debug("Unregistered client %x", clt) -} - -type clientAction func(clt *client, cfg beat.ClientConfig) error - -func (ct *clientTracker) ApplyToAllClients(action clientAction) error { - ct.clientsMx.Lock() - defer ct.clientsMx.Unlock() - - for clt, cfg := range ct.clients { - if clt == nil { - // should never happen - ct.log.Warn("encountered nil client pointer while iterating over connected pipeline clients. Skipping...") - continue - } - err := action(clt, cfg) - if err != nil { - return fmt.Errorf("error applying action to clients: %w", err) - } - } - return nil -} - // WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline. type WaitCloseMode uint8 @@ -172,11 +126,6 @@ func New( observer: nilObserver, waitCloseTimeout: settings.WaitClose, processors: settings.Processors, - clientTracker: &clientTracker{ - log: monitors.Logger, - clients: make(map[*client]beat.ClientConfig), - clientsMx: sync.Mutex{}, - }, } if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { p.waitCloseTimeout = settings.WaitClose @@ -268,7 +217,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { eventFlags: eventFlags, canDrop: canDrop, observer: p.observer, - unregisterer: p.clientTracker, } ackHandler := cfg.EventListener @@ -305,8 +253,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return nil, fmt.Errorf("client failed to connect because the pipeline is shutting down") } - p.clientTracker.RegisterClient(clt, cfg) - p.observer.clientConnected() return clt, nil