Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global processor reload #40312

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,9 @@ replace (
github.com/dgraph-io/ristretto => github.com/elastic/ristretto v0.1.1-0.20220602190459-83b0895ca5b3 // Removes glog dependency. See https://github.com/elastic/beats/issues/31810.
github.com/dop251/goja => github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20
github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6

// this is temporary until the elastic-agent-client changes are merged and released
github.com/elastic/elastic-agent-client/v7 => github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e
github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270
github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c
github.com/godror/godror => github.com/godror/godror v0.33.2 // updating to v0.24.2 caused a breaking change
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,12 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14 h1:s9OJHxfrHxOFBkQrKQAKOE4cYMKMO6s3UxZUxWk+utA=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e h1:l3tZOaQGHhG6s6mmciPzr2ZLVHks4/na7VJj6nUxifc=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e h1:c+moxj4HJUXmcVYb/YXk0ei48SM4v8Hc8QeHd5A5/Eo=
github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
Expand Down
40 changes: 31 additions & 9 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ type Beat struct {
RawConfig *config.C // Raw config that can be unpacked to get Beat specific config data.
IdxSupporter idxmgmt.Supporter

keystore keystore.Keystore
processors processing.Supporter
keystore keystore.Keystore
processors processing.Supporter
processingFactory processing.SupportFactory

InputQueueSize int // Size of the producer queue used by most queues.

Expand Down Expand Up @@ -377,7 +378,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
}
}

var publisher *pipeline.Pipeline
monitors := pipeline.Monitors{
Metrics: reg,
Telemetry: monitoring.GetNamespace("state").GetRegistry(),
Expand All @@ -389,18 +389,39 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
Processors: b.processors,
InputQueueSize: b.InputQueueSize,
}
publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}

reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
if b.Manager.Enabled() {
if !outputEnabled {
// Create a pipeline builder and don't set the pipeline until we are ready to set the output
pipelineBuilder := pipeline.NewPipelineBuilder(b.Info, monitors, b.Config.Pipeline, outputFactory, settings, b.processingFactory)
b.Publisher = pipelineBuilder
reload.RegisterV2.MustRegisterGlobalProcessors(pipelineBuilder.GlobalProcessorsReloader())
reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(pipelineBuilder.OutputReloader()))
} else {
// don't set the builder, the output is already there.
// TODO Log a warning saying that reload won't work properly

publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}
b.Publisher = publisher
reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
}
} else {
// normal running without agent
publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %w", err)
}
b.Publisher = publisher
reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader()))
}

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer pipeline.Close()

b.Publisher = publisher
beater, err := bt(&b.Beat, sub)
if err != nil {
return nil, err
Expand Down Expand Up @@ -898,6 +919,7 @@ func (b *Beat) configure(settings Settings) error {
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
b.processingFactory = processingFactory
b.processors, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig)

b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors",
Expand Down
19 changes: 19 additions & 0 deletions libbeat/common/reload/reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ const OutputRegName = "output"
// APMRegName is the registation name for APM tracing.
const APMRegName = "apm"

// GlobalProcessorRegName is the registration name for global processors config
const GlobalProcessorRegName = "global_processors"

// ConfigWithMeta holds a pair of config.C and optional metadata for it
type ConfigWithMeta struct {
// Config to store
Expand Down Expand Up @@ -157,6 +160,14 @@ func (r *Registry) MustRegisterAPM(list Reloadable) {
}
}

// MustRegisterGlobalProcessors is a V2-specific registration function
// that declares a reloadable global processor configuration
func (r *Registry) MustRegisterGlobalProcessors(obj Reloadable) {
if err := r.Register(GlobalProcessorRegName, obj); err != nil {
panic(err)
}
}

// GetInputList is a V2-specific function
// That returns the reloadable list created for an input
func (r *Registry) GetInputList() ReloadableList {
Expand All @@ -181,6 +192,14 @@ func (r *Registry) GetReloadableAPM() Reloadable {
return r.confs[APMRegName]
}

// GetReloadableGlobalProcessors is a V2-specific function
// That returns the reloader for the registered global processor reloadable object
func (r *Registry) GetReloadableGlobalProcessors() Reloadable {
r.RLock()
defer r.RUnlock()
return r.confs[GlobalProcessorRegName]
}

// GetRegisteredNames returns the list of names registered
func (r *Registry) GetRegisteredNames() []string {
r.RLock()
Expand Down
158 changes: 158 additions & 0 deletions libbeat/publisher/pipeline/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package pipeline

import (
"errors"
"fmt"
"sync"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher/processing"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)

var ErrNoReloadPipelineAlreadyBuilt = errors.New("cannot reload as Pipeline has been already built")

// Builder is an implementation of Builder pattern for building a Pipeline
type Builder struct {
// Pipeline parameters
beatInfo beat.Info
monitors Monitors
config Config
makeOutput outputFactory
settings Settings

// Attributes for lazy loading of the pipeline
buildmx sync.Mutex
pipelineBuilt bool
pipeline *Pipeline
pipelineErr error

// Reloaders
outputReloader *outputReloader
globalProcessorReloader *globalProcessorReloader

processingFactory processing.SupportFactory
}

func NewPipelineBuilder(beatInfo beat.Info, monitors Monitors, config Config, outFactory outputFactory, settings Settings, processorsFactory processing.SupportFactory) *Builder {
b := &Builder{
beatInfo: beatInfo,
monitors: monitors,
config: config,
makeOutput: outFactory,
settings: settings,
processingFactory: processorsFactory,
}

b.outputReloader = &outputReloader{b}
b.globalProcessorReloader = &globalProcessorReloader{b: b}

return b
}

func (b *Builder) WithGlobalProcessors(processors processing.Supporter) error {
b.settings.Processors = processors
return nil
}

// build will materialize the Pipeline only once, after Pipeline has been built this is a no-op
func (b *Builder) build() {
b.buildmx.Lock()
defer b.buildmx.Unlock()
log := b.monitors.Logger

if b.pipelineBuilt {
log.Debug("Pipeline already built, skipping..")
return
}

log.Info("Creating Pipeline...")
p, err := LoadWithSettings(b.beatInfo, b.monitors, b.config, b.makeOutput, b.settings)
b.pipelineBuilt = true

if err != nil {
log.Errorf("Error creating pipeline: %s", err)
b.pipelineErr = fmt.Errorf("instantiating Pipeline: %w", err)
return
}
b.pipeline = p
log.Info("Pipeline created successfully")
}

func (b *Builder) ConnectWith(config beat.ClientConfig) (beat.Client, error) {
b.build()
if b.pipelineErr != nil {
return nil, b.pipelineErr
}

return b.pipeline.ConnectWith(config)
}

func (b *Builder) Connect() (beat.Client, error) {
b.build()
if b.pipelineErr != nil {
return nil, b.pipelineErr
}

return b.pipeline.Connect()
}

func (b *Builder) OutputReloader() OutputReloader {
return b.outputReloader
}

func (b *Builder) GlobalProcessorsReloader() *globalProcessorReloader {
return b.globalProcessorReloader
}

type globalProcessorReloader struct {
b *Builder
}

func (gpr *globalProcessorReloader) Reload(config *reload.ConfigWithMeta) error {
builder := gpr.b
builder.buildmx.Lock()
defer builder.buildmx.Unlock()

builder.monitors.Logger.Debugf("Reloading global processor with %s", config.Config)

if builder.pipelineBuilt {
// Too late as the pipeline is built already. We need to restart
builder.monitors.Logger.Debug("Pipeline already instantiated. Returning ErrNoReloadPipelineAlreadyBuilt")
return ErrNoReloadPipelineAlreadyBuilt
}

newProcessors, err := gpr.createProcessors(config.Config)
if err != nil {
return fmt.Errorf("creating new processors with config %s : %w", config.Config, err)

Check failure on line 130 in libbeat/publisher/pipeline/builder.go

View workflow job for this annotation

GitHub Actions / lint (windows)

printf: fmt.Errorf format %s has arg config.Config of wrong type *github.com/elastic/elastic-agent-libs/config.C (govet)
}
builder.WithGlobalProcessors(newProcessors)

Check failure on line 132 in libbeat/publisher/pipeline/builder.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `builder.WithGlobalProcessors` is not checked (errcheck)
builder.monitors.Logger.Debugf("Reloading global processor complete", config.Config)
return nil
}

func (gpr *globalProcessorReloader) createProcessors(rawProcessorConfig *conf.C) (processing.Supporter, error) {
processingFactory := gpr.b.processingFactory
if processingFactory == nil {
processingFactory = processing.MakeDefaultBeatSupport(true)
}
return processingFactory(gpr.b.beatInfo, logp.L().Named("processors"), rawProcessorConfig)
}

type outputReloader struct {
b *Builder
}

func (or *outputReloader) Reload(
cfg *reload.ConfigWithMeta,
factory func(outputs.Observer, conf.Namespace) (outputs.Group, error),
) error {
or.b.build() // create the pipeline if needed
if or.b.pipelineErr != nil {
return or.b.pipelineErr
}
return or.b.pipeline.OutputReloader().Reload(cfg, factory)
}
4 changes: 4 additions & 0 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
"github.com/elastic/elastic-agent-libs/logp"
)

type clientUnregisterer interface {

Check failure on line 32 in libbeat/publisher/pipeline/client.go

View workflow job for this annotation

GitHub Actions / lint (windows)

type `clientUnregisterer` is unused (unused)
UnregisterClient(*client)
}

// client connects a beat with the processors and pipeline queue.
type client struct {
logger *logp.Logger
Expand All @@ -42,7 +46,7 @@

// Open state, signaling, and sync primitives for coordinating client Close.
isOpen atomic.Bool // set to false during shutdown, such that no new events will be accepted anymore.
closeOnce sync.Once // closeOnce ensure that the client shutdown sequence is only executed once

Check failure on line 49 in libbeat/publisher/pipeline/client.go

View workflow job for this annotation

GitHub Actions / lint (windows)

field `closeOnce` is unused (unused)

observer observer
eventListener beat.EventListener
Expand Down
21 changes: 11 additions & 10 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type OutputReloader interface {
// The new pipeline will take ownership of queue and outputs. On Close, the
// queue and outputs will be closed.
func New(
beat beat.Info,
beatInfo beat.Info,
monitors Monitors,
userQueueConfig conf.Namespace,
out outputs.Group,
Expand All @@ -121,7 +121,7 @@ func New(
}

p := &Pipeline{
beatInfo: beat,
beatInfo: beatInfo,
monitors: monitors,
observer: nilObserver,
waitCloseTimeout: settings.WaitClose,
Expand All @@ -147,7 +147,7 @@ func New(
return nil, err
}

output, err := newOutputController(beat, monitors, p.observer, queueFactory, settings.InputQueueSize)
output, err := newOutputController(beatInfo, monitors, p.observer, queueFactory, settings.InputQueueSize)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -209,7 +209,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
return nil, err
}

client := &client{
clt := &client{
logger: p.monitors.Logger,
isOpen: atomic.MakeBool(true),
clientListener: cfg.ClientListener,
Expand All @@ -233,7 +233,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {

producerCfg := queue.ProducerConfig{
ACK: func(count int) {
client.observer.eventsACKed(count)
clt.observer.eventsACKed(count)
if ackHandler != nil {
ackHandler.ACKEvents(count)
}
Expand All @@ -244,17 +244,18 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
ackHandler = acker.Nil()
}

client.eventListener = ackHandler
client.waiter = waiter
client.producer = p.outputController.queueProducer(producerCfg)
if client.producer == nil {
clt.eventListener = ackHandler
clt.waiter = waiter
clt.producer = p.outputController.queueProducer(producerCfg)
if clt.producer == nil {
// This can only happen if the pipeline was shut down while clients
// were still waiting to connect.
return nil, fmt.Errorf("client failed to connect because the pipeline is shutting down")
}

p.observer.clientConnected()
return client, nil

return clt, nil
}

func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) {
Expand Down
Loading
Loading