diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index bedc5fcb24d0..aeba9c202cf9 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -44,3 +44,5 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981] - Generate configuration from `mage` for all Beats. {pull}12618[12618] - Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client. {pull}8543[8543] +- Introduce beat.OutputChooses publisher mode. {pull}12996[12996] +- Ensure that beat.Processor, beat.ProcessorList, and processors.ProcessorList are compatible and can be composed more easily. {pull}12996[12996] diff --git a/libbeat/beat/pipeline.go b/libbeat/beat/pipeline.go index 5743e1cc92a4..d49a10f085bf 100644 --- a/libbeat/beat/pipeline.go +++ b/libbeat/beat/pipeline.go @@ -129,6 +129,7 @@ type PipelineACKHandler struct { } type ProcessorList interface { + Processor All() []Processor } @@ -144,10 +145,16 @@ type Processor interface { type PublishMode uint8 const ( - // DefaultGuarantees are up to the pipeline configuration, as configured by the - // operator. + // DefaultGuarantees are up to the pipeline configuration itself. DefaultGuarantees PublishMode = iota + // OutputChooses mode fully depends on the output and its configuration. + // Events might be dropped based on the users output configuration. + // In this mode no events are dropped within the pipeline. Events are only removed + // after the output has ACKed the events to the pipeline, even if the output + // did drop the events. + OutputChooses + // GuaranteedSend ensures events are retried until acknowledged by the output. // Normally guaranteed sending should be used with some client ACK-handling // to update state keeping track of the sending status. diff --git a/libbeat/processors/processor.go b/libbeat/processors/processor.go index 89f2fde9a289..56900a70d9d2 100644 --- a/libbeat/processors/processor.go +++ b/libbeat/processors/processor.go @@ -40,10 +40,18 @@ type Processor interface { String() string } -func New(config PluginConfig) (*Processors, error) { - procs := &Processors{ - log: logp.NewLogger(logName), +// NewList creates a new empty processor list. +// Additional processors can be added to the List field. +func NewList(log *logp.Logger) *Processors { + if log == nil { + log = logp.NewLogger(logName) } + return &Processors{log: log} +} + +// New creates a list of processors from a list of free user configurations. +func New(config PluginConfig) (*Processors, error) { + procs := NewList(nil) for _, procConfig := range config { // Handle if/then/else processor which has multiple top-level keys. diff --git a/libbeat/publisher/pipeline/config.go b/libbeat/publisher/pipeline/config.go index 0a0c2924a08a..70006d417c6d 100644 --- a/libbeat/publisher/pipeline/config.go +++ b/libbeat/publisher/pipeline/config.go @@ -41,7 +41,7 @@ func validateClientConfig(c *beat.ClientConfig) error { withDrop := false switch m := c.PublishMode; m { - case beat.DefaultGuarantees, beat.GuaranteedSend: + case beat.DefaultGuarantees, beat.GuaranteedSend, beat.OutputChooses: case beat.DropIfFull: withDrop = true default: diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index e8289e28ba52..93d7e6c59f4f 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -49,6 +49,9 @@ import ( // the output clients using a shared work queue for the active outputs.Group. // Processors in the pipeline are executed in the clients go-routine, before // entering the queue. No filtering/processing will occur on the output side. +// +// For client connecting to this pipeline, the default PublishMode is +// OutputChooses. type Pipeline struct { beatInfo beat.Info @@ -273,7 +276,7 @@ func (p *Pipeline) Close() error { return nil } -// Connect creates a new client with default settings +// Connect creates a new client with default settings. func (p *Pipeline) Connect() (beat.Client, error) { return p.ConnectWith(beat.ClientConfig{}) } @@ -281,6 +284,7 @@ func (p *Pipeline) Connect() (beat.Client, error) { // ConnectWith create a new Client for publishing events to the pipeline. // The client behavior on close and ACK handling can be configured by setting // the appropriate fields in the passed ClientConfig. +// If not set otherwise the defaut publish mode is OutputChooses. func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { var ( canDrop bool