diff --git a/filebeat/channel/factory.go b/filebeat/channel/factory.go index 226299e0213..956019dc313 100644 --- a/filebeat/channel/factory.go +++ b/filebeat/channel/factory.go @@ -3,13 +3,12 @@ package channel import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/libbeat/publisher/beat" ) type OutletFactory struct { done <-chan struct{} - pipeline publisher.Publisher + pipeline beat.Pipeline eventer beat.ClientEventer wgEvents eventCounter @@ -48,7 +47,7 @@ type prospectorOutletConfig struct { // connecting a prospector to the publisher pipeline. func NewOutletFactory( done <-chan struct{}, - pipeline publisher.Publisher, + pipeline beat.Pipeline, wgEvents eventCounter, ) *OutletFactory { o := &OutletFactory{ @@ -102,7 +101,7 @@ func (f *OutletFactory) Create(cfg *common.Config) (Outleter, error) { } } - client, err := f.pipeline.ConnectX(beat.ClientConfig{ + client, err := f.pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, EventMetadata: config.EventMetadata, Meta: meta, diff --git a/generator/beat/{beat}/beater/{beat}.go.tmpl b/generator/beat/{beat}/beater/{beat}.go.tmpl index 13a31d6e502..0a27be3639f 100644 --- a/generator/beat/{beat}/beater/{beat}.go.tmpl +++ b/generator/beat/{beat}/beater/{beat}.go.tmpl @@ -7,7 +7,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" + pub "github.com/elastic/beats/libbeat/publisher/beat" "{beat_path}/config" ) @@ -15,7 +15,7 @@ import ( type {Beat} struct { done chan struct{} config config.Config - client publisher.Client + client pub.Client } // Creates beater @@ -35,7 +35,12 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { func (bt *{Beat}) Run(b *beat.Beat) error { logp.Info("{beat} is running! Hit CTRL-C to stop it.") - bt.client = b.Publisher.Connect() + var err error + bt.client, err = b.Publisher.Connect() + if err != nil { + return err + } + ticker := time.NewTicker(bt.config.Period) counter := 1 for { @@ -45,12 +50,14 @@ func (bt *{Beat}) Run(b *beat.Beat) error { case <-ticker.C: } - event := common.MapStr{ - "@timestamp": common.Time(time.Now()), - "type": b.Info.Name, - "counter": counter, + event := pub.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "type": b.Info.Name, + "counter": counter, + }, } - bt.client.PublishEvent(event) + bt.client.Publish(event) logp.Info("Event sent") counter++ } diff --git a/heartbeat/beater/manager.go b/heartbeat/beater/manager.go index 195069a7840..980c2333fa1 100644 --- a/heartbeat/beater/manager.go +++ b/heartbeat/beater/manager.go @@ -10,7 +10,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/heartbeat/monitors" @@ -35,7 +34,7 @@ type monitor struct { active map[string]monitorTask - pipeline publisher.Publisher + pipeline beat.Pipeline } type monitorTask struct { @@ -66,7 +65,7 @@ var defaultFilePollInterval = 5 * time.Second const defaultEventType = "monitor" func newMonitorManager( - pipeline publisher.Publisher, + pipeline beat.Pipeline, jobControl jobControl, registry *monitors.Registrar, configs []*common.Config, @@ -207,7 +206,7 @@ func (m *monitor) Update(configs []*common.Config) error { } // create connection per monitorTask - client, err := m.pipeline.ConnectX(beat.ClientConfig{ + client, err := m.pipeline.ConnectWith(beat.ClientConfig{ EventMetadata: t.config.EventMetadata, Processor: processors, }) diff --git a/libbeat/beat/beat.go b/libbeat/beat/beat.go index c27016c860a..59fade43459 100644 --- a/libbeat/beat/beat.go +++ b/libbeat/beat/beat.go @@ -56,8 +56,8 @@ import ( "github.com/elastic/beats/libbeat/outputs/elasticsearch" "github.com/elastic/beats/libbeat/paths" "github.com/elastic/beats/libbeat/plugin" - "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" + "github.com/elastic/beats/libbeat/publisher/beat" + "github.com/elastic/beats/libbeat/publisher/pipeline" svc "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/libbeat/template" "github.com/elastic/beats/libbeat/version" @@ -108,10 +108,10 @@ type SetupMLCallback func(*Beat) error // Beat contains the basic beat data and the publisher client used to publish // events. type Beat struct { - Info common.BeatInfo // beat metadata. - RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. - Config BeatConfig // Common Beat configuration data. - Publisher publisher.Publisher // Publisher + Info common.BeatInfo // beat metadata. + RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data. + Config BeatConfig // Common Beat configuration data. + Publisher beat.Pipeline // Publisher pipeline SetupMLCallback SetupMLCallback // setup callback for ML job configs InSetupCmd bool // this is set to true when the `setup` command is called @@ -119,16 +119,24 @@ type Beat struct { // BeatConfig struct contains the basic configuration of every beat type BeatConfig struct { - Shipper publisher.ShipperConfig `config:",inline"` - Output common.ConfigNamespace `config:"output"` - Monitoring *common.Config `config:"xpack.monitoring"` - Logging logp.Logging `config:"logging"` - Processors processors.PluginConfig `config:"processors"` - Path paths.Path `config:"path"` - Dashboards *common.Config `config:"setup.dashboards"` - Template *common.Config `config:"setup.template"` - Kibana *common.Config `config:"setup.kibana"` - Http *common.Config `config:"http"` + // beat top-level settings + Name string `config:"name"` + MaxProcs int `config:"max_procs"` + + // beat internal components configurations + HTTP *common.Config `config:"http"` + Path paths.Path `config:"path"` + Logging logp.Logging `config:"logging"` + + // output/publishing related configurations + Pipeline pipeline.Config `config:",inline"` + Monitoring *common.Config `config:"xpack.monitoring"` + Output common.ConfigNamespace `config:"output"` + + // 'setup' configurations + Dashboards *common.Config `config:"setup.dashboards"` + Template *common.Config `config:"setup.template"` + Kibana *common.Config `config:"setup.kibana"` } var ( @@ -238,10 +246,6 @@ func (b *Beat) createBeater(bt Creator) (Beater, error) { } logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version) - processors, err := processors.New(b.Config.Processors) - if err != nil { - return nil, fmt.Errorf("error initializing processors: %v", err) - } err = b.registerTemplateLoading() if err != nil { @@ -249,16 +253,16 @@ func (b *Beat) createBeater(bt Creator) (Beater, error) { } debugf("Initializing output plugins") - publisher, err := publisher.New(b.Info, b.Config.Output, b.Config.Shipper, processors) + pipeline, err := pipeline.Load(b.Info, b.Config.Pipeline, b.Config.Output) if err != nil { return nil, fmt.Errorf("error initializing publisher: %v", err) } // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, // but refine publisher to disconnect clients on stop automatically - // defer publisher.Stop() + // defer pipeline.Close() - b.Publisher = publisher + b.Publisher = pipeline beater, err := bt(b, sub) if err != nil { return nil, err @@ -317,8 +321,8 @@ func (b *Beat) launch(bt Creator) error { defer logp.Info("%s stopped.", b.Info.Beat) defer logp.LogTotalExpvars(&b.Config.Logging) - if b.Config.Http.Enabled() { - api.Start(b.Config.Http, b.Info) + if b.Config.HTTP.Enabled() { + api.Start(b.Config.HTTP, b.Info) } return beater.Run(b) @@ -458,7 +462,7 @@ func (b *Beat) configure() error { return err } - if name := b.Config.Shipper.Name; name != "" { + if name := b.Config.Name; name != "" { b.Info.Name = name } @@ -482,11 +486,8 @@ func (b *Beat) configure() error { logp.Info("Beat UUID: %v", b.Info.UUID) - if b.Config.Shipper.MaxProcs != nil { - maxProcs := *b.Config.Shipper.MaxProcs - if maxProcs > 0 { - runtime.GOMAXPROCS(maxProcs) - } + if maxProcs := b.Config.MaxProcs; maxProcs > 0 { + runtime.GOMAXPROCS(maxProcs) } return nil diff --git a/libbeat/mock/mockbeat.go b/libbeat/mock/mockbeat.go index 6f17424d3d8..dd1f2529590 100644 --- a/libbeat/mock/mockbeat.go +++ b/libbeat/mock/mockbeat.go @@ -6,7 +6,7 @@ import ( "github.com/elastic/beats/libbeat/beat" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" + pub "github.com/elastic/beats/libbeat/publisher/beat" ) ///*** Mock Beat Setup ***/// @@ -16,7 +16,7 @@ var Name = "mockbeat" type Mockbeat struct { done chan struct{} - client publisher.Client + client pub.Client } // Creates beater @@ -29,13 +29,20 @@ func New(b *beat.Beat, _ *common.Config) (beat.Beater, error) { /// *** Beater interface methods ***/// func (mb *Mockbeat) Run(b *beat.Beat) error { - mb.client = b.Publisher.Connect() + var err error + + mb.client, err = b.Publisher.Connect() + if err != nil { + return err + } // Wait until mockbeat is done - mb.client.PublishEvent(common.MapStr{ - "@timestamp": common.Time(time.Now()), - "type": "mock", - "message": "Mockbeat is alive!", + mb.client.Publish(pub.Event{ + Timestamp: time.Now(), + Fields: common.MapStr{ + "type": "mock", + "message": "Mockbeat is alive!", + }, }) <-mb.done return nil diff --git a/libbeat/monitoring/report/elasticsearch/elasticsearch.go b/libbeat/monitoring/report/elasticsearch/elasticsearch.go index 94f1939209c..00adffa58e0 100644 --- a/libbeat/monitoring/report/elasticsearch/elasticsearch.go +++ b/libbeat/monitoring/report/elasticsearch/elasticsearch.go @@ -17,9 +17,9 @@ import ( "github.com/elastic/beats/libbeat/outputs/outil" "github.com/elastic/beats/libbeat/outputs/transport" "github.com/elastic/beats/libbeat/publisher/beat" - "github.com/elastic/beats/libbeat/publisher/broker" - "github.com/elastic/beats/libbeat/publisher/broker/membroker" "github.com/elastic/beats/libbeat/publisher/pipeline" + "github.com/elastic/beats/libbeat/publisher/queue" + "github.com/elastic/beats/libbeat/publisher/queue/memqueue" ) type reporter struct { @@ -104,8 +104,8 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er out.Clients = append(out.Clients, client) } - brokerFactory := func(e broker.Eventer) (broker.Broker, error) { - return membroker.NewBroker(membroker.Settings{ + queueFactory := func(e queue.Eventer) (queue.Queue, error) { + return memqueue.NewBroker(memqueue.Settings{ Eventer: e, Events: 20, }), nil @@ -115,7 +115,7 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er pipeline, err := pipeline.New( monitoring, - brokerFactory, out, pipeline.Settings{ + queueFactory, out, pipeline.Settings{ WaitClose: 0, WaitCloseMode: pipeline.NoWaitOnClose, }) diff --git a/libbeat/publisher/bc/publisher/async.go b/libbeat/publisher/bc/publisher/async.go deleted file mode 100644 index 84af2ddd461..00000000000 --- a/libbeat/publisher/bc/publisher/async.go +++ /dev/null @@ -1,154 +0,0 @@ -package publisher - -import ( - "sync" - - "github.com/elastic/beats/libbeat/common/op" - "github.com/elastic/beats/libbeat/publisher/beat" -) - -type asyncClient struct { - done <-chan struct{} - - client beat.Client - acker *asyncACKer - - guaranteedClient beat.Client - guaranteedAcker *asyncACKer -} - -type asyncACKer struct { - // Note: mutex is required for sending the message item to the - // asyncACKer and publishing all events, so no two users of the async client can - // interleave events. This is a limitation enforced by the - // old publisher API to be removed - // Note: normally every go-routine wanting to publish should have it's own - // client instance. That is, no contention on the mutex is really expected. - // Still, the mutex is used as additional safety measure - count int - - mutex sync.Mutex - waiting []message -} - -func newAsyncClient(pub *BeatPublisher, done <-chan struct{}) (*asyncClient, error) { - c := &asyncClient{ - done: done, - acker: newAsyncACKer(), - guaranteedAcker: newAsyncACKer(), - } - - var err error - c.guaranteedClient, err = pub.pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - ACKCount: c.guaranteedAcker.onACK, - }) - if err != nil { - return nil, err - } - - c.client, err = pub.pipeline.ConnectWith(beat.ClientConfig{ - ACKCount: c.acker.onACK, - }) - if err != nil { - c.guaranteedClient.Close() - return nil, err - } - - go func() { - // closer - <-done - c.guaranteedClient.Close() - c.client.Close() - }() - - return c, nil -} - -func newAsyncACKer() *asyncACKer { - return &asyncACKer{} -} - -func (c *asyncClient) publish(m message) bool { - if *publishDisabled { - debug("publisher disabled") - op.SigCompleted(m.context.Signal) - return true - } - - count := len(m.data) - single := count == 0 - if single { - count = 1 - } - - client := c.client - acker := c.acker - if m.context.Guaranteed { - client = c.guaranteedClient - acker = c.guaranteedAcker - } - - acker.add(m) - if single { - client.Publish(m.datum) - } else { - client.PublishAll(m.data) - } - - return true -} - -func (a *asyncACKer) add(msg message) { - a.mutex.Lock() - a.waiting = append(a.waiting, msg) - a.mutex.Unlock() -} - -func (a *asyncACKer) onACK(count int) { - for count > 0 { - cnt := a.count - if cnt == 0 { - // we're not waiting for a message its ACK yet -> advance to next message - // object and retry - a.mutex.Lock() - if len(a.waiting) == 0 { - a.mutex.Unlock() - return - } - - active := a.waiting[0] - cnt = len(active.data) - a.mutex.Unlock() - - if cnt == 0 { - cnt = 1 - } - a.count = cnt - continue - } - - acked := count - if acked > cnt { - acked = cnt - } - cnt -= acked - count -= acked - - a.count = cnt - finished := cnt == 0 - if finished { - var msg message - - a.mutex.Lock() - // finished active message - msg = a.waiting[0] - a.waiting = a.waiting[1:] - a.mutex.Unlock() - - if sig := msg.context.Signal; sig != nil { - sig.Completed() - } - } - } -} diff --git a/libbeat/publisher/bc/publisher/client.go b/libbeat/publisher/bc/publisher/client.go deleted file mode 100644 index 3da1b11f42c..00000000000 --- a/libbeat/publisher/bc/publisher/client.go +++ /dev/null @@ -1,221 +0,0 @@ -package publisher - -import ( - "errors" - "time" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/op" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" - "github.com/elastic/beats/libbeat/publisher/beat" -) - -// Metrics that can retrieved through the expvar web interface. -var ( - publishedEvents = monitoring.NewInt(nil, "publisher.events.count") -) - -var ( - ErrClientClosed = errors.New("client closed") -) - -// Client is used by beats to publish new events. -// -// The publish methods add fields that are common to all events. Both methods -// add the 'beat' field that contains name and hostname. Also they add 'tags' -// and 'fields'. -// -// Event publishers can override the default index for an event by adding a -// 'beat' field whose value is a common.MapStr that contains an 'index' field -// specifying the destination index. -// -// event := common.MapStr{ -// // Setting a custom index for a single event. -// "beat": common.MapStr{"index": "custom-index"}, -// } -// -// Event publishers can add fields and tags to an event. The fields will take -// precedence over the global fields defined in the shipper configuration. -// -// event := common.MapStr{ -// // Add custom fields to the root of the event. -// common.EventMetadataKey: common.EventMetadata{ -// UnderRoot: true, -// Fields: common.MapStr{"env": "production"} -// } -// } -type Client interface { - // Close disconnects the Client from the publisher pipeline. - Close() error - - // PublishEvent publishes one event with given options. If Sync option is set, - // PublishEvent will block until output plugins report success or failure state - // being returned by this method. - PublishEvent(event common.MapStr, opts ...ClientOption) bool - - // PublishEvents publishes multiple events with given options. If Guaranteed - // option is set, PublishEvent will block until output plugins report - // success or failure state being returned by this method. - PublishEvents(events []common.MapStr, opts ...ClientOption) bool -} - -type client struct { - canceler *op.Canceler - - publisher *BeatPublisher - sync *syncClient - async *asyncClient -} - -type message struct { - client *client - context Context - datum beat.Event - data []beat.Event -} - -type sender interface { - publish(message) bool -} - -func newClient(pub *BeatPublisher) *client { - c := &client{ - canceler: op.NewCanceler(), - publisher: pub, - } - return c -} - -func (c *client) Close() error { - if c == nil { - return nil - } - - c.canceler.Cancel() - - // atomic decrement clients counter - c.publisher.numClients.Dec() - return nil -} - -func (c *client) PublishEvent(event common.MapStr, opts ...ClientOption) bool { - var metadata common.MapStr - meta, ctx, pipeline, err := c.getPipeline(opts) - if err != nil { - panic(err) - } - - if len(meta) != 0 { - if len(meta) != 1 { - logp.Debug("publish", "too many metadata, pick first") - } - metadata = meta[0] - } - - publishedEvents.Add(1) - return pipeline.publish(message{ - client: c, - context: ctx, - datum: makeEvent(event, metadata), - }) -} - -func (c *client) PublishEvents(events []common.MapStr, opts ...ClientOption) bool { - var metadataAll common.MapStr - meta, ctx, pipeline, err := c.getPipeline(opts) - if err != nil { - panic(err) - } - - if len(meta) != 0 && len(events) != len(meta) { - if len(meta) != 1 { - logp.Debug("publish", - "Number of metadata elements does not match number of events => dropping metadata") - meta = nil - } else { - metadataAll = meta[0] - meta = nil - } - } - - data := make([]beat.Event, 0, len(events)) - for i, event := range events { - metadata := metadataAll - if meta != nil { - metadata = meta[i] - } - data = append(data, makeEvent(event, metadata)) - } - - if len(data) == 0 { - logp.Debug("filter", "No events to publish") - return true - } - - publishedEvents.Add(int64(len(data))) - return pipeline.publish(message{client: c, context: ctx, data: data}) -} - -func (c *client) getPipeline(opts []ClientOption) ([]common.MapStr, Context, sender, error) { - var err error - values, ctx := MakeContext(opts) - - if ctx.Sync { - if c.sync == nil { - c.sync, err = newSyncClient(c.publisher, c.canceler.Done()) - if err != nil { - return nil, ctx, nil, err - } - } - return values, ctx, c.sync, nil - } - - if c.async == nil { - c.async, err = newAsyncClient(c.publisher, c.canceler.Done()) - if err != nil { - return nil, ctx, nil, err - } - } - return values, ctx, c.async, nil -} - -func MakeContext(opts []ClientOption) ([]common.MapStr, Context) { - var ctx Context - var meta []common.MapStr - for _, opt := range opts { - var m []common.MapStr - m, ctx = opt(ctx) - if m != nil { - if meta == nil { - meta = m - } else { - meta = append(meta, m...) - } - } - } - return meta, ctx -} - -func makeEvent(fields common.MapStr, meta common.MapStr) beat.Event { - if logp.IsDebug("publish") { - logp.Debug("publish", "Publish: %s", fields.StringToPrint()) - } - - var ts time.Time - switch value := fields["@timestamp"].(type) { - case time.Time: - ts = value - case common.Time: - ts = time.Time(value) - default: - ts = time.Now() - } - delete(fields, "@timestamp") - - return beat.Event{ - Timestamp: ts, - Meta: meta, - Fields: fields, - } -} diff --git a/libbeat/publisher/bc/publisher/opts.go b/libbeat/publisher/bc/publisher/opts.go deleted file mode 100644 index 1df74eb65ee..00000000000 --- a/libbeat/publisher/bc/publisher/opts.go +++ /dev/null @@ -1,56 +0,0 @@ -package publisher - -import ( - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/op" -) - -// ClientOption allows API users to set additional options when publishing events. -type ClientOption func(option Context) ([]common.MapStr, Context) - -// Guaranteed option will retry publishing the event, until send attempt have -// been ACKed by output plugin. -func Guaranteed(o Context) ([]common.MapStr, Context) { - o.Guaranteed = true - return nil, o -} - -// Sync option will block the event publisher until an event has been ACKed by -// the output plugin or failed. -func Sync(o Context) ([]common.MapStr, Context) { - o.Sync = true - return nil, o -} - -func Signal(signaler op.Signaler) ClientOption { - return func(ctx Context) ([]common.MapStr, Context) { - if ctx.Signal == nil { - ctx.Signal = signaler - } else { - ctx.Signal = op.CombineSignalers(ctx.Signal, signaler) - } - return nil, ctx - } -} - -func Metadata(m common.MapStr) ClientOption { - if len(m) == 0 { - return nilOption - } - return func(ctx Context) ([]common.MapStr, Context) { - return []common.MapStr{m}, ctx - } -} - -func MetadataBatch(m []common.MapStr) ClientOption { - if len(m) == 0 { - return nilOption - } - return func(ctx Context) ([]common.MapStr, Context) { - return m, ctx - } -} - -func nilOption(o Context) ([]common.MapStr, Context) { - return nil, o -} diff --git a/libbeat/publisher/bc/publisher/pipeline.go b/libbeat/publisher/bc/publisher/pipeline.go deleted file mode 100644 index 4aa30378ed7..00000000000 --- a/libbeat/publisher/bc/publisher/pipeline.go +++ /dev/null @@ -1,98 +0,0 @@ -package publisher - -import ( - "errors" - "fmt" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/monitoring" - "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/broker" - "github.com/elastic/beats/libbeat/publisher/pipeline" -) - -func createPipeline( - beatInfo common.BeatInfo, - shipper ShipperConfig, - processors *processors.Processors, - outcfg common.ConfigNamespace, -) (*pipeline.Pipeline, error) { - - reg := monitoring.Default.GetRegistry("libbeat") - if reg == nil { - reg = monitoring.Default.NewRegistry("libbeat") - } - - var out outputs.Group - if !(*publishDisabled) { - var err error - - if !outcfg.IsSet() { - msg := "No outputs are defined. Please define one under the output section." - logp.Info(msg) - return nil, errors.New(msg) - } - - // TODO: add support to unload/reassign outStats on output reloading - outReg := reg.NewRegistry("output") - outStats := outputs.MakeStats(outReg) - - out, err = outputs.Load(beatInfo, &outStats, outcfg.Name(), outcfg.Config()) - if err != nil { - return nil, err - } - - monitoring.NewString(outReg, "type").Set(outcfg.Name()) - } - - name := shipper.Name - if name == "" { - name = beatInfo.Hostname - } - - settings := pipeline.Settings{ - WaitClose: 0, - WaitCloseMode: pipeline.NoWaitOnClose, - Disabled: *publishDisabled, - Processors: processors, - Annotations: pipeline.Annotations{ - Event: shipper.EventMetadata, - Beat: common.MapStr{ - "name": name, - "hostname": beatInfo.Hostname, - "version": beatInfo.Version, - }, - }, - } - - brokerType := "mem" - if b := shipper.Queue.Name(); b != "" { - brokerType = b - } - - brokerFactory := broker.FindFactory(brokerType) - if brokerFactory == nil { - return nil, fmt.Errorf("'%v' is no valid queue type", brokerType) - } - - brokerConfig := shipper.Queue.Config() - if brokerConfig == nil { - brokerConfig = common.NewConfig() - } - - p, err := pipeline.New( - monitoring.Default.GetRegistry("libbeat"), - func(eventer broker.Eventer) (broker.Broker, error) { - return brokerFactory(eventer, brokerConfig) - }, - out, settings, - ) - if err != nil { - return nil, err - } - - logp.Info("Publisher name: %s", name) - return p, err -} diff --git a/libbeat/publisher/bc/publisher/publish.go b/libbeat/publisher/bc/publisher/publish.go deleted file mode 100644 index ab3312be2be..00000000000 --- a/libbeat/publisher/bc/publisher/publish.go +++ /dev/null @@ -1,127 +0,0 @@ -package publisher - -import ( - "flag" - - "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/common/atomic" - "github.com/elastic/beats/libbeat/common/op" - "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/beat" -) - -// command line flags -var publishDisabled *bool - -var debug = logp.MakeDebug("publish") - -type Context struct { - publishOptions - Signal op.Signaler -} - -type publishOptions struct { - Guaranteed bool - Sync bool -} - -type TransactionalEventPublisher interface { - PublishTransaction(transaction op.Signaler, events []common.MapStr) -} - -type Publisher interface { - Connect() Client - ConnectX(beat.ClientConfig) (beat.Client, error) - SetACKHandler(beat.PipelineACKHandler) error -} - -type BeatPublisher struct { - disabled bool - name string - - // keep count of clients connected to publisher. A publisher is allowed to - // Stop only if all clients have been disconnected - numClients atomic.Uint32 - - pipeline beat.Pipeline -} - -type ShipperConfig struct { - common.EventMetadata `config:",inline"` // Fields and tags to add to each event. - Name string `config:"name"` - Queue common.ConfigNamespace `config:"queue"` - - // internal publisher queue sizes - MaxProcs *int `config:"max_procs"` -} - -func init() { - publishDisabled = flag.Bool("N", false, "Disable actual publishing for testing") -} - -// Create new PublisherType -func New( - beat common.BeatInfo, - output common.ConfigNamespace, - shipper ShipperConfig, - processors *processors.Processors, -) (*BeatPublisher, error) { - publisher := BeatPublisher{} - if err := publisher.init(beat, output, shipper, processors); err != nil { - return nil, err - } - - return &publisher, nil -} - -func (publisher *BeatPublisher) init( - beat common.BeatInfo, - outConfig common.ConfigNamespace, - shipper ShipperConfig, - processors *processors.Processors, -) error { - var err error - publisher.disabled = *publishDisabled - if publisher.disabled { - logp.Info("Dry run mode. All output types except the file based one are disabled.") - } - - publisher.name = shipper.Name - if publisher.name == "" { - publisher.name = beat.Hostname - } - - publisher.pipeline, err = createPipeline(beat, shipper, processors, outConfig) - if err != nil { - return err - } - - logp.Info("Publisher name: %s", publisher.name) - return nil -} - -func (publisher *BeatPublisher) Stop() { - if publisher.numClients.Load() > 0 { - panic("All clients must disconnect before shutting down publisher pipeline") - } - - publisher.pipeline.Close() -} - -func (publisher *BeatPublisher) Connect() Client { - publisher.numClients.Inc() - return newClient(publisher) -} - -func (publisher *BeatPublisher) ConnectX(config beat.ClientConfig) (beat.Client, error) { - return publisher.pipeline.ConnectWith(config) -} - -func (publisher *BeatPublisher) SetACKHandler(h beat.PipelineACKHandler) error { - return publisher.pipeline.SetACKHandler(h) -} - -func (publisher *BeatPublisher) GetName() string { - return publisher.name -} diff --git a/libbeat/publisher/bc/publisher/sync.go b/libbeat/publisher/bc/publisher/sync.go deleted file mode 100644 index 3e8336701cf..00000000000 --- a/libbeat/publisher/bc/publisher/sync.go +++ /dev/null @@ -1,104 +0,0 @@ -package publisher - -import ( - "github.com/elastic/beats/libbeat/common/op" - "github.com/elastic/beats/libbeat/publisher/beat" -) - -type syncClient struct { - client beat.Client - guaranteedClient beat.Client - done <-chan struct{} - active syncMsgContext -} - -type syncMsgContext struct { - count int - sig chan struct{} -} - -func newSyncClient(pub *BeatPublisher, done <-chan struct{}) (*syncClient, error) { - // always assume sync client is used with 'guarnateed' flag (true for filebeat and winlogbeat) - - c := &syncClient{done: done} - c.active.init() - - var err error - c.guaranteedClient, err = pub.pipeline.ConnectWith(beat.ClientConfig{ - PublishMode: beat.GuaranteedSend, - ACKCount: c.onACK, - }) - if err != nil { - return nil, err - } - - c.client, err = pub.pipeline.ConnectWith(beat.ClientConfig{ - ACKCount: c.onACK, - }) - if err != nil { - c.guaranteedClient.Close() - return nil, err - } - - go func() { - <-done - c.client.Close() - c.guaranteedClient.Close() - }() - - return c, nil -} - -func (c *syncClient) onACK(count int) { - c.active.count -= count - if c.active.count < 0 { - panic("negative event count") - } - - if c.active.count == 0 { - c.active.sig <- struct{}{} - } -} - -func (c *syncClient) publish(m message) bool { - if *publishDisabled { - debug("publisher disabled") - op.SigCompleted(m.context.Signal) - return true - } - - count := len(m.data) - single := count == 0 - if single { - count = 1 - } - - client := c.client - if m.context.Guaranteed { - client = c.guaranteedClient - } - - c.active.count = count - if single { - client.Publish(m.datum) - } else { - client.PublishAll(m.data) - } - - // wait for event or close - select { - case <-c.done: - return false - case <-c.active.sig: - } - - if s := m.context.Signal; s != nil { - s.Completed() - } - - return true -} - -func (ctx *syncMsgContext) init() { - ctx.sig = make(chan struct{}, 1) -} diff --git a/libbeat/publisher/beat/pipeline.go b/libbeat/publisher/beat/pipeline.go index 654337bae0d..86023601fa3 100644 --- a/libbeat/publisher/beat/pipeline.go +++ b/libbeat/publisher/beat/pipeline.go @@ -7,7 +7,6 @@ import ( ) type Pipeline interface { - Close() error Connect() (Client, error) ConnectWith(ClientConfig) (Client, error) SetACKHandler(PipelineACKHandler) error @@ -73,9 +72,9 @@ type ClientEventer interface { Closing() // Closing indicates the client is being shutdown next Closed() // Closed indicates the client being fully shutdown - Published() // event been has successfully forwarded to the publisher pipeline + Published() // event has been successfully forwarded to the publisher pipeline FilteredOut(Event) // event has been filtered out/dropped by processors - DroppedOnPublish(Event) // event has been dropped, while waiting for the broker + DroppedOnPublish(Event) // event has been dropped, while waiting for the queue } // PipelineACKHandler configures some pipeline-wide event ACK handler. diff --git a/libbeat/publisher/broker/broker_reg.go b/libbeat/publisher/broker/broker_reg.go deleted file mode 100644 index 683292e1d16..00000000000 --- a/libbeat/publisher/broker/broker_reg.go +++ /dev/null @@ -1,38 +0,0 @@ -package broker - -import ( - "fmt" - - "github.com/elastic/beats/libbeat/common" -) - -// Global broker type registry for configuring and loading a broker instance -// via common.Config -var brokerReg = map[string]Factory{} - -// RegisterType registers a new broker type. -func RegisterType(name string, f Factory) { - if brokerReg[name] != nil { - panic(fmt.Errorf("broker type '%v' exists already", name)) - } - brokerReg[name] = f -} - -// FindFactory retrieves a broker types constructor. Returns nil if broker type is unknown -func FindFactory(name string) Factory { - return brokerReg[name] -} - -// Load instantiates a new broker. -func Load(eventer Eventer, config common.ConfigNamespace) (Broker, error) { - t, cfg := config.Name(), config.Config() - if t == "" { - t = "mem" - } - - factory := FindFactory(t) - if factory == nil { - return nil, fmt.Errorf("broker type %v undefined", t) - } - return factory(eventer, cfg) -} diff --git a/libbeat/publisher/broker/brokertest/doc.go b/libbeat/publisher/broker/brokertest/doc.go deleted file mode 100644 index 67525dfbbf9..00000000000 --- a/libbeat/publisher/broker/brokertest/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -// Package brokertest provides common functionality tests all broker implementations -// must pass. These tests guarantee a broker fits well into the publisher pipeline. -package brokertest diff --git a/libbeat/publisher/broker/membroker/doc.go b/libbeat/publisher/broker/membroker/doc.go deleted file mode 100644 index c96df29fa75..00000000000 --- a/libbeat/publisher/broker/membroker/doc.go +++ /dev/null @@ -1,4 +0,0 @@ -// Package membroker provides an in-memory publisher.Broker implementation for -// use with the publisher pipeline. -// The broker implementation is registered as broker type "mem". -package membroker diff --git a/libbeat/publisher/pipeline/batch.go b/libbeat/publisher/pipeline/batch.go index 06d28276442..ddb222f0b02 100644 --- a/libbeat/publisher/pipeline/batch.go +++ b/libbeat/publisher/pipeline/batch.go @@ -4,11 +4,11 @@ import ( "sync" "github.com/elastic/beats/libbeat/publisher" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) type Batch struct { - original broker.Batch + original queue.Batch ctx *batchContext ttl int events []publisher.Event @@ -25,7 +25,7 @@ var batchPool = sync.Pool{ }, } -func newBatch(ctx *batchContext, original broker.Batch, ttl int) *Batch { +func newBatch(ctx *batchContext, original queue.Batch, ttl int) *Batch { if original == nil { panic("empty batch") } diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index c8e3f3e3d5a..d2582ee917e 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -6,19 +6,18 @@ import ( "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/publisher/beat" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) -// client connects a beat with the processors and pipeline broker. +// client connects a beat with the processors and pipeline queue. // // TODO: All ackers currently drop any late incoming ACK. Some beats still might // be interested in handling/waiting for event ACKs more globally // -> add support for not dropping pending ACKs type client struct { - // active connection to broker pipeline *Pipeline processors beat.Processor - producer broker.Producer + producer queue.Producer mutex sync.Mutex acker acker diff --git a/libbeat/publisher/pipeline/config.go b/libbeat/publisher/pipeline/config.go index 11d2ea0ea97..6c74721e525 100644 --- a/libbeat/publisher/pipeline/config.go +++ b/libbeat/publisher/pipeline/config.go @@ -3,17 +3,20 @@ package pipeline import ( "errors" "fmt" - "time" "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher/beat" ) // Config object for loading a pipeline instance via Load. type Config struct { - WaitShutdown time.Duration `config:"wait_shutdown"` - Broker common.ConfigNamespace `config:"broker"` - Output common.ConfigNamespace `config:"output"` + // Event processing configurations + common.EventMetadata `config:",inline"` // Fields and tags to add to each event. + Processors processors.PluginConfig `config:"processors"` + + // Event queue + Queue common.ConfigNamespace `config:"queue"` } // validateClientConfig checks a ClientConfig can be used with (*Pipeline).ConnectWith. diff --git a/libbeat/publisher/pipeline/consumer.go b/libbeat/publisher/pipeline/consumer.go index d7980cf89e1..391824cfff9 100644 --- a/libbeat/publisher/pipeline/consumer.go +++ b/libbeat/publisher/pipeline/consumer.go @@ -3,10 +3,10 @@ package pipeline import ( "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/logp" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) -// eventConsumer collects and forwards events from the broker to the outputs work queue. +// eventConsumer collects and forwards events from the queue to the outputs work queue. // The eventConsumer is managed by the controller and receives additional pause signals // from the retryer in case of too many events failing to be send or if retryer // is receiving cancelled batches from outputs to be closed on output reloading. @@ -20,15 +20,15 @@ type eventConsumer struct { wait atomic.Bool sig chan consumerSignal - broker broker.Broker - consumer broker.Consumer + queue queue.Queue + consumer queue.Consumer out *outputGroup } type consumerSignal struct { tag consumerEventTag - consumer broker.Consumer + consumer queue.Consumer out *outputGroup } @@ -42,7 +42,7 @@ const ( func newEventConsumer( log *logp.Logger, - broker broker.Broker, + queue queue.Queue, ctx *batchContext, ) *eventConsumer { c := &eventConsumer{ @@ -51,8 +51,8 @@ func newEventConsumer( sig: make(chan consumerSignal, 3), out: nil, - broker: broker, - consumer: broker.Consumer(), + queue: queue, + consumer: queue.Consumer(), ctx: ctx, } @@ -106,15 +106,15 @@ func (c *eventConsumer) updOutput(grp *outputGroup) { out: grp, } - // update eventConsumer with new broker connection - c.consumer = c.broker.Consumer() + // update eventConsumer with new queue connection + c.consumer = c.queue.Consumer() c.sig <- consumerSignal{ tag: sigConsumerUpdateInput, consumer: c.consumer, } } -func (c *eventConsumer) loop(consumer broker.Consumer) { +func (c *eventConsumer) loop(consumer queue.Consumer) { log := c.logger log.Debug("start pipeline event consumer") @@ -147,14 +147,14 @@ func (c *eventConsumer) loop(consumer broker.Consumer) { for { if !paused && c.out != nil && consumer != nil && batch == nil { out = c.out.workQueue - brokerBatch, err := consumer.Get(c.out.batchSize) + queueBatch, err := consumer.Get(c.out.batchSize) if err != nil { out = nil consumer = nil continue } - batch = newBatch(c.ctx, brokerBatch, c.out.timeToLive) + batch = newBatch(c.ctx, queueBatch, c.out.timeToLive) paused = c.paused() if paused { out = nil diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 1b8b9424503..d6e75e770e1 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -3,7 +3,7 @@ package pipeline import ( "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) // outputController manages the pipelines output capabilites, like: @@ -14,7 +14,7 @@ type outputController struct { logger *logp.Logger observer *observer - broker broker.Broker + queue queue.Queue retryer *retryer consumer *eventConsumer @@ -41,12 +41,12 @@ type outputWorker interface { func newOutputController( log *logp.Logger, observer *observer, - b broker.Broker, + b queue.Queue, ) *outputController { c := &outputController{ logger: log, observer: observer, - broker: b, + queue: b, } ctx := &batchContext{} diff --git a/libbeat/publisher/pipeline/module.go b/libbeat/publisher/pipeline/module.go new file mode 100644 index 00000000000..3e8f269f021 --- /dev/null +++ b/libbeat/publisher/pipeline/module.go @@ -0,0 +1,131 @@ +package pipeline + +import ( + "errors" + "flag" + "fmt" + + "github.com/elastic/beats/libbeat/common" + "github.com/elastic/beats/libbeat/logp" + "github.com/elastic/beats/libbeat/monitoring" + "github.com/elastic/beats/libbeat/outputs" + "github.com/elastic/beats/libbeat/processors" + "github.com/elastic/beats/libbeat/publisher/queue" +) + +// Global pipeline module for loading the main pipeline from a configuration object + +// command line flags +var publishDisabled = false + +const defaultQueueType = "mem" + +func init() { + flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing") +} + +// Load uses a Config object to create a new complete Pipeline instance with +// configured queue and outputs. +func Load( + beatInfo common.BeatInfo, + config Config, + outcfg common.ConfigNamespace, +) (*Pipeline, error) { + if publishDisabled { + logp.Info("Dry run mode. All output types except the file based one are disabled.") + } + + processors, err := processors.New(config.Processors) + if err != nil { + return nil, fmt.Errorf("error initializing processors: %v", err) + } + + reg := monitoring.Default.GetRegistry("libbeat") + if reg == nil { + reg = monitoring.Default.NewRegistry("libbeat") + } + + name := beatInfo.Name + settings := Settings{ + WaitClose: 0, + WaitCloseMode: NoWaitOnClose, + Disabled: publishDisabled, + Processors: processors, + Annotations: Annotations{ + Event: config.EventMetadata, + Beat: common.MapStr{ + "name": name, + "hostname": beatInfo.Hostname, + "version": beatInfo.Version, + }, + }, + } + + queueBuilder, err := createQueueBuilder(config.Queue) + if err != nil { + return nil, err + } + + out, err := loadOutput(beatInfo, reg, outcfg) + if err != nil { + return nil, err + } + + p, err := New(reg, queueBuilder, out, settings) + if err != nil { + return nil, err + } + + logp.Info("Publisher name: %s", name) + return p, err +} + +func loadOutput( + beatInfo common.BeatInfo, + reg *monitoring.Registry, + outcfg common.ConfigNamespace, +) (outputs.Group, error) { + if publishDisabled { + return outputs.Group{}, nil + } + + if !outcfg.IsSet() { + msg := "No outputs are defined. Please define one under the output section." + logp.Info(msg) + return outputs.Fail(errors.New(msg)) + } + + // TODO: add support to unload/reassign outStats on output reloading + outReg := reg.NewRegistry("output") + outStats := outputs.MakeStats(outReg) + + out, err := outputs.Load(beatInfo, &outStats, outcfg.Name(), outcfg.Config()) + if err != nil { + return outputs.Fail(err) + } + + monitoring.NewString(outReg, "type").Set(outcfg.Name()) + + return out, nil +} + +func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) { + queueType := defaultQueueType + if b := config.Name(); b != "" { + queueType = b + } + + queueFactory := queue.FindFactory(queueType) + if queueFactory == nil { + return nil, fmt.Errorf("'%v' is no valid queue type", queueType) + } + + queueConfig := config.Config() + if queueConfig == nil { + queueConfig = common.NewConfig() + } + + return func(eventer queue.Eventer) (queue.Queue, error) { + return queueFactory(eventer, queueConfig) + }, nil +} diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 1aad1945c9c..aa62bb95601 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -1,4 +1,4 @@ -// Package pipeline combines all publisher functionality (processors, broker, +// Package pipeline combines all publisher functionality (processors, queue, // outputs) to create instances of complete publisher pipelines, beats can // connect to publish events to. package pipeline @@ -17,25 +17,25 @@ import ( "github.com/elastic/beats/libbeat/processors" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/publisher/beat" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) // Pipeline implementation providint all beats publisher functionality. -// The pipeline consists of clients, processors, a central broker, an output +// The pipeline consists of clients, processors, a central queue, an output // controller and the actual outputs. -// The broker implementing the broker.Broker interface is the most entral entity +// The queue implementing the queue.Queue interface is the most entral entity // to the pipeline, providing support for pushung, batching and pulling events. // The pipeline adds different ACKing strategies and wait close support on top -// of the broker. For handling ACKs, the pipeline keeps track of filtered out events, +// of the queue. For handling ACKs, the pipeline keeps track of filtered out events, // to be ACKed to the client in correct order. // The output controller configures a (potentially reloadable) set of load -// balanced output clients. Events will be pulled from the broker and pushed to +// balanced output clients. Events will be pulled from the queue and pushed to // the output clients using a shared work queue for the active outputs.Group. // Processors in the pipeline are executed in the clients go-routine, before -// entering the broker. No filtering/processing will occur on the output side. +// entering the queue. No filtering/processing will occur on the output side. type Pipeline struct { logger *logp.Logger - broker broker.Broker + queue queue.Queue output *outputController observer observer @@ -122,49 +122,14 @@ type waitCloser struct { events sync.WaitGroup } -type brokerFactory func(broker.Eventer) (broker.Broker, error) +type queueFactory func(queue.Eventer) (queue.Queue, error) -// Load uses a Config object to create a new complete Pipeline instance with -// configured broker and outputs. -func Load( - beatInfo common.BeatInfo, - monitoring *monitoring.Registry, - config Config, -) (*Pipeline, error) { - if !config.Output.IsSet() { - return nil, errors.New("no output configured") - } - - brokerFactory := func(e broker.Eventer) (broker.Broker, error) { - return broker.Load(e, config.Broker) - } - - output, err := outputs.Load(beatInfo, nil, config.Output.Name(), config.Output.Config()) - if err != nil { - return nil, err - } - - // TODO: configure pipeline processors - pipeline, err := New(monitoring, brokerFactory, output, Settings{ - WaitClose: config.WaitShutdown, - WaitCloseMode: WaitOnPipelineClose, - }) - if err != nil { - for _, c := range output.Clients { - c.Close() - } - return nil, err - } - - return pipeline, nil -} - -// New create a new Pipeline instance from a broker instance and a set of outputs. -// The new pipeline will take ownership of broker and outputs. On Close, the -// broker and outputs will be closed. +// New create a new Pipeline instance from a queue instance and a set of outputs. +// The new pipeline will take ownership of queue and outputs. On Close, the +// queue and outputs will be closed. func New( metrics *monitoring.Registry, - brokerFactory brokerFactory, + queueFactory queueFactory, out outputs.Group, settings Settings, ) (*Pipeline, error) { @@ -211,19 +176,19 @@ func New( if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 { p.waitCloser = &waitCloser{} - // waitCloser decrements counter on broker ACK (not per client) + // waitCloser decrements counter on queue ACK (not per client) p.eventer.waitClose = p.waitCloser } - p.broker, err = brokerFactory(&p.eventer) + p.queue, err = queueFactory(&p.eventer) if err != nil { return nil, err } - p.eventSema = newSema(p.broker.BufferConfig().Events) + p.eventSema = newSema(p.queue.BufferConfig().Events) p.observer.init(metrics) - p.output = newOutputController(log, &p.observer, p.broker) + p.output = newOutputController(log, &p.observer, p.queue) p.output.Set(out) return p, nil @@ -268,7 +233,7 @@ func (p *Pipeline) SetACKHandler(handler beat.PipelineACKHandler) error { return nil } -// Close stops the pipeline, outputs and broker. +// Close stops the pipeline, outputs and queue. // If WaitClose with WaitOnPipelineClose mode is configured, Close will block // for a duration of WaitClose, if there are still active events in the pipeline. // Note: clients must be closed before calling Close. @@ -296,13 +261,13 @@ func (p *Pipeline) Close() error { // TODO: close/disconnect still active clients - // close output before shutting down broker + // close output before shutting down queue p.output.Close() - // shutdown broker - err := p.broker.Close() + // shutdown queue + err := p.queue.Close() if err != nil { - log.Err("pipeline broker shutdown error: ", err) + log.Err("pipeline queue shutdown error: ", err) } p.observer.cleanup() @@ -354,8 +319,8 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { processors := p.newProcessorPipeline(cfg) acker := p.makeACKer(processors != nil, &cfg, waitClose) - producerCfg := broker.ProducerConfig{ - // only cancel events from broker if acker is configured + producerCfg := queue.ProducerConfig{ + // only cancel events from queue if acker is configured // and no pipeline-wide ACK handler is registered DropOnCancel: acker != nil && p.eventer.cb == nil, } @@ -377,7 +342,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { acker = nilACKer } - producer := p.broker.Producer(producerCfg) + producer := p.queue.Producer(producerCfg) client := &client{ pipeline: p, isOpen: atomic.MakeBool(true), @@ -401,7 +366,7 @@ func (e *pipelineEventer) OnACK(n int) { wc.dec(n) } if e.cb != nil { - e.cb.reportBrokerACK(n) + e.cb.reportQueueACK(n) } } diff --git a/libbeat/publisher/pipeline/pipeline_ack.go b/libbeat/publisher/pipeline/pipeline_ack.go index a79b465b7e5..eb4d1abc556 100644 --- a/libbeat/publisher/pipeline/pipeline_ack.go +++ b/libbeat/publisher/pipeline/pipeline_ack.go @@ -111,8 +111,8 @@ func (b *pipelineEventsACK) createEventACKer(canDrop bool, sema *sema, fn func([ } // pipelineEventCB internally handles active ACKs in the pipeline. -// It receives ACK events from the broker and the individual clients. -// Once the broker returns an ACK to the pipelineEventCB, the worker loop will collect +// It receives ACK events from the queue and the individual clients. +// Once the queue returns an ACK to the pipelineEventCB, the worker loop will collect // events from all clients having published events in the last batch of events // being ACKed. // the PipelineACKHandler will be notified, once all events being ACKed @@ -194,7 +194,7 @@ func (p *pipelineEventCB) close() { // Note: the call blocks, until the ACK handler has collected all active events // from all clients. This ensure an ACK event being fully 'captured' // by the pipeline, before receiving/processing another ACK event. -// In the meantime the broker has the chance of batching-up more ACK events, +// In the meantime the queue has the chance of batching-up more ACK events, // such that only one ACK event is being reported to the pipeline handler func (p *pipelineEventCB) onEvents(events []beat.Event, acked int) { p.pushMsg(eventsMsg{events: events, total: len(events), acked: acked}) @@ -215,7 +215,7 @@ func (p *pipelineEventCB) pushMsg(msg eventsMsg) { } // Starts a new ACKed event. -func (p *pipelineEventCB) reportBrokerACK(acked int) { +func (p *pipelineEventCB) reportQueueACK(acked int) { p.acks <- acked } @@ -285,7 +285,7 @@ func (p *pipelineEventCB) collect(count int) (exit bool) { } } - // signal clients we processed all active ACKs, as reported by broker + // signal clients we processed all active ACKs, as reported by queue for _, sig := range signalers { close(sig) } diff --git a/libbeat/publisher/broker/membroker/ackloop.go b/libbeat/publisher/queue/memqueue/ackloop.go similarity index 99% rename from libbeat/publisher/broker/membroker/ackloop.go rename to libbeat/publisher/queue/memqueue/ackloop.go index 505b91a2441..f5a117383cc 100644 --- a/libbeat/publisher/broker/membroker/ackloop.go +++ b/libbeat/publisher/queue/memqueue/ackloop.go @@ -1,4 +1,4 @@ -package membroker +package memqueue // ackLoop implements the brokers asynchronous ACK worker. // Multiple concurrent ACKs from consecutive published batches will be batched up by the diff --git a/libbeat/publisher/broker/membroker/broker.go b/libbeat/publisher/queue/memqueue/broker.go similarity index 92% rename from libbeat/publisher/broker/membroker/broker.go rename to libbeat/publisher/queue/memqueue/broker.go index c6f353bcf7b..f570921f6a9 100644 --- a/libbeat/publisher/broker/membroker/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -1,4 +1,4 @@ -package membroker +package memqueue import ( "fmt" @@ -8,7 +8,7 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/publisher" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) type Broker struct { @@ -29,7 +29,7 @@ type Broker struct { acks chan int scheduledACKs chan chanList - eventer broker.Eventer + eventer queue.Eventer // wait group for worker shutdown wg sync.WaitGroup @@ -37,7 +37,7 @@ type Broker struct { } type Settings struct { - Eventer broker.Eventer + Eventer queue.Eventer Events int FlushMinEvents int FlushTimeout time.Duration @@ -57,10 +57,10 @@ type chanList struct { } func init() { - broker.RegisterType("mem", create) + queue.RegisterType("mem", create) } -func create(eventer broker.Eventer, cfg *common.Config) (broker.Broker, error) { +func create(eventer queue.Eventer, cfg *common.Config) (queue.Queue, error) { config := defaultConfig if err := cfg.Unpack(&config); err != nil { return nil, err @@ -74,7 +74,7 @@ func create(eventer broker.Eventer, cfg *common.Config) (broker.Broker, error) { }), nil } -// NewBroker creates a new in-memory broker holding up to sz number of events. +// NewBroker creates a new broker based in-memory queue holding up to sz number of events. // If waitOnClose is set to true, the broker will block on Close, until all internal // workers handling incoming messages and ACKs have been shut down. func NewBroker( @@ -147,17 +147,17 @@ func (b *Broker) Close() error { return nil } -func (b *Broker) BufferConfig() broker.BufferConfig { - return broker.BufferConfig{ +func (b *Broker) BufferConfig() queue.BufferConfig { + return queue.BufferConfig{ Events: b.buf.Size(), } } -func (b *Broker) Producer(cfg broker.ProducerConfig) broker.Producer { +func (b *Broker) Producer(cfg queue.ProducerConfig) queue.Producer { return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel) } -func (b *Broker) Consumer() broker.Consumer { +func (b *Broker) Consumer() queue.Consumer { return newConsumer(b) } diff --git a/libbeat/publisher/broker/membroker/buf.go b/libbeat/publisher/queue/memqueue/buf.go similarity index 99% rename from libbeat/publisher/broker/membroker/buf.go rename to libbeat/publisher/queue/memqueue/buf.go index 0e9ad9bf019..1386fd97b10 100644 --- a/libbeat/publisher/broker/membroker/buf.go +++ b/libbeat/publisher/queue/memqueue/buf.go @@ -1,4 +1,4 @@ -package membroker +package memqueue import ( "fmt" diff --git a/libbeat/publisher/broker/membroker/config.go b/libbeat/publisher/queue/memqueue/config.go similarity index 96% rename from libbeat/publisher/broker/membroker/config.go rename to libbeat/publisher/queue/memqueue/config.go index 2616b1aeb83..ef843484a37 100644 --- a/libbeat/publisher/broker/membroker/config.go +++ b/libbeat/publisher/queue/memqueue/config.go @@ -1,4 +1,4 @@ -package membroker +package memqueue import ( "errors" diff --git a/libbeat/publisher/broker/membroker/consume.go b/libbeat/publisher/queue/memqueue/consume.go similarity index 94% rename from libbeat/publisher/broker/membroker/consume.go rename to libbeat/publisher/queue/memqueue/consume.go index 1a82d651269..560c58e4a80 100644 --- a/libbeat/publisher/broker/membroker/consume.go +++ b/libbeat/publisher/queue/memqueue/consume.go @@ -1,4 +1,4 @@ -package membroker +package memqueue import ( "errors" @@ -6,7 +6,7 @@ import ( "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/publisher" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) type consumer struct { @@ -45,7 +45,7 @@ func newConsumer(b *Broker) *consumer { } } -func (c *consumer) Get(sz int) (broker.Batch, error) { +func (c *consumer) Get(sz int) (queue.Batch, error) { // log := c.broker.logger if c.closed.Load() { diff --git a/libbeat/publisher/queue/memqueue/doc.go b/libbeat/publisher/queue/memqueue/doc.go new file mode 100644 index 00000000000..159c2ba7fec --- /dev/null +++ b/libbeat/publisher/queue/memqueue/doc.go @@ -0,0 +1,4 @@ +// Package memqueue provides an in-memory queue.Queue implementation for +// use with the publisher pipeline. +// The queue implementation is registered as queue type "mem". +package memqueue diff --git a/libbeat/publisher/broker/membroker/eventloop.go b/libbeat/publisher/queue/memqueue/eventloop.go similarity index 99% rename from libbeat/publisher/broker/membroker/eventloop.go rename to libbeat/publisher/queue/memqueue/eventloop.go index e1e73d95fad..773317e5254 100644 --- a/libbeat/publisher/broker/membroker/eventloop.go +++ b/libbeat/publisher/queue/memqueue/eventloop.go @@ -1,4 +1,4 @@ -package membroker +package memqueue import ( "time" diff --git a/libbeat/publisher/broker/membroker/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go similarity index 97% rename from libbeat/publisher/broker/membroker/internal_api.go rename to libbeat/publisher/queue/memqueue/internal_api.go index e0a4281b10a..211ae657fe9 100644 --- a/libbeat/publisher/broker/membroker/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -1,4 +1,4 @@ -package membroker +package memqueue import "github.com/elastic/beats/libbeat/publisher" diff --git a/libbeat/publisher/broker/membroker/log.go b/libbeat/publisher/queue/memqueue/log.go similarity index 65% rename from libbeat/publisher/broker/membroker/log.go rename to libbeat/publisher/queue/memqueue/log.go index c83ae4213cf..4f3be49ffcc 100644 --- a/libbeat/publisher/broker/membroker/log.go +++ b/libbeat/publisher/queue/memqueue/log.go @@ -1,4 +1,4 @@ -package membroker +package memqueue import ( "github.com/elastic/beats/libbeat/logp" @@ -9,4 +9,4 @@ type logger interface { Debugf(string, ...interface{}) } -var defaultLogger logger = logp.NewLogger("membroker") +var defaultLogger logger = logp.NewLogger("memqueue") diff --git a/libbeat/publisher/broker/membroker/produce.go b/libbeat/publisher/queue/memqueue/produce.go similarity index 95% rename from libbeat/publisher/broker/membroker/produce.go rename to libbeat/publisher/queue/memqueue/produce.go index 8dc1ea1f63d..bede007c3f2 100644 --- a/libbeat/publisher/broker/membroker/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -1,10 +1,10 @@ -package membroker +package memqueue import ( "github.com/elastic/beats/libbeat/common/atomic" "github.com/elastic/beats/libbeat/publisher" "github.com/elastic/beats/libbeat/publisher/beat" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) type forgetfullProducer struct { @@ -35,7 +35,7 @@ type produceState struct { type ackHandler func(count int) -func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) broker.Producer { +func newProducer(b *Broker, cb ackHandler, dropCB func(beat.Event), dropOnCancel bool) queue.Producer { openState := openState{ isOpen: atomic.MakeBool(true), done: make(chan struct{}), diff --git a/libbeat/publisher/broker/membroker/broker_test.go b/libbeat/publisher/queue/memqueue/queue_test.go similarity index 62% rename from libbeat/publisher/broker/membroker/broker_test.go rename to libbeat/publisher/queue/memqueue/queue_test.go index 6cbe92dd5f9..eda01937907 100644 --- a/libbeat/publisher/broker/membroker/broker_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -1,4 +1,4 @@ -package membroker +package memqueue import ( "flag" @@ -6,8 +6,8 @@ import ( "testing" "time" - "github.com/elastic/beats/libbeat/publisher/broker" - "github.com/elastic/beats/libbeat/publisher/broker/brokertest" + "github.com/elastic/beats/libbeat/publisher/queue" + "github.com/elastic/beats/libbeat/publisher/queue/queuetest" ) var seed int64 @@ -34,22 +34,22 @@ func TestProduceConsumer(t *testing.T) { t.Log("batchSize: ", batchSize) t.Log("bufferSize: ", bufferSize) - factory := makeTestBroker(bufferSize) + factory := makeTestQueue(bufferSize) t.Run("single", func(t *testing.T) { - brokertest.TestSingleProducerConsumer(t, events, batchSize, factory) + queuetest.TestSingleProducerConsumer(t, events, batchSize, factory) }) t.Run("multi", func(t *testing.T) { - brokertest.TestMultiProducerConsumer(t, events, batchSize, factory) + queuetest.TestMultiProducerConsumer(t, events, batchSize, factory) }) } func TestProducerCancelRemovesEvents(t *testing.T) { - brokertest.TestProducerCancelRemovesEvents(t, makeTestBroker(1024)) + queuetest.TestProducerCancelRemovesEvents(t, makeTestQueue(1024)) } -func makeTestBroker(sz int) brokertest.BrokerFactory { - return func() broker.Broker { +func makeTestQueue(sz int) queuetest.QueueFactory { + return func() queue.Queue { return NewBroker(Settings{Events: sz, WaitOnClose: true}) } } diff --git a/libbeat/publisher/broker/broker.go b/libbeat/publisher/queue/queue.go similarity index 58% rename from libbeat/publisher/broker/broker.go rename to libbeat/publisher/queue/queue.go index aeb19da5354..6e19da004e5 100644 --- a/libbeat/publisher/broker/broker.go +++ b/libbeat/publisher/queue/queue.go @@ -1,4 +1,4 @@ -package broker +package queue import ( "io" @@ -8,24 +8,24 @@ import ( "github.com/elastic/beats/libbeat/publisher/beat" ) -// Factory for creating a broker used by a pipeline instance. -type Factory func(Eventer, *common.Config) (Broker, error) +// Factory for creating a queue used by a pipeline instance. +type Factory func(Eventer, *common.Config) (Queue, error) -// Eventer listens to special events to be send by broker implementations. +// Eventer listens to special events to be send by queue implementations. type Eventer interface { OnACK(int) // number of consecutively published messages, acked by producers } -// Broker is responsible for accepting, forwarding and ACKing events. -// A broker will receive and buffer single events from its producers. -// Consumers will receive events in batches from the brokers buffers. +// Queue is responsible for accepting, forwarding and ACKing events. +// A queue will receive and buffer single events from its producers. +// Consumers will receive events in batches from the queues buffers. // Once a consumer has finished processing a batch, it must ACK the batch, for -// the broker to advance its buffers. Events in progress or ACKed are not readable -// from the broker. -// When the broker decides it is safe to progress (events have been ACKed by +// the queue to advance its buffers. Events in progress or ACKed are not readable +// from the queue. +// When the queue decides it is safe to progress (events have been ACKed by // consumer or flush to some other intermediate storage), it will send an ACK signal // with the number of ACKed events to the Producer (ACK happens in batches). -type Broker interface { +type Queue interface { io.Closer BufferConfig() BufferConfig @@ -40,33 +40,33 @@ type Broker interface { // but still dropping events, the pipeline can use the buffer information, // to define an upper bound of events being active in the pipeline. type BufferConfig struct { - Events int // can be <= 0, if broker can not determine limit + Events int // can be <= 0, if queue can not determine limit } // ProducerConfig as used by the Pipeline to configure some custom callbacks -// between pipeline and broker. +// between pipeline and queue. type ProducerConfig struct { // if ACK is set, the callback will be called with number of events produced - // by the producer instance and being ACKed by the broker. + // by the producer instance and being ACKed by the queue. ACK func(count int) - // OnDrop provided to the broker, to report events being silently dropped by - // the broker. For example an async producer close and publish event, + // OnDrop provided to the queue, to report events being silently dropped by + // the queue. For example an async producer close and publish event, // with close happening early might result in the event being dropped. The callback - // gives a brokers user a chance to keep track of total number of events - // being buffered by the broker. + // gives a queue user a chance to keep track of total number of events + // being buffered by the queue. OnDrop func(beat.Event) - // DropOnCancel is a hint to the broker to drop events if the producer disconnects + // DropOnCancel is a hint to the queue to drop events if the producer disconnects // via Cancel. DropOnCancel bool } // Producer interface to be used by the pipelines client to forward events to be -// published to the broker. -// When a producer calls `Cancel`, it's up to the broker to send or remove +// published to the queue. +// When a producer calls `Cancel`, it's up to the queue to send or remove // events not yet ACKed. -// Note: A broker is still allowed to send the ACK signal after Cancel. The +// Note: A queue is still allowed to send the ACK signal after Cancel. The // pipeline client must filter out ACKs after cancel. type Producer interface { Publish(event publisher.Event) bool @@ -76,14 +76,14 @@ type Producer interface { // Consumer interface to be used by the pipeline output workers. // The `Get` method retrieves a batch of events up to size `sz`. If sz <= 0, -// the batch size is up to the broker. +// the batch size is up to the queue. type Consumer interface { Get(sz int) (Batch, error) Close() error } // Batch of events to be returned to Consumers. The `ACK` method will send the -// ACK signal to the broker. +// ACK signal to the queue. type Batch interface { Events() []publisher.Event ACK() diff --git a/libbeat/publisher/queue/queue_reg.go b/libbeat/publisher/queue/queue_reg.go new file mode 100644 index 00000000000..b445c6424bf --- /dev/null +++ b/libbeat/publisher/queue/queue_reg.go @@ -0,0 +1,38 @@ +package queue + +import ( + "fmt" + + "github.com/elastic/beats/libbeat/common" +) + +// Global queue type registry for configuring and loading a queue instance +// via common.Config +var queueReg = map[string]Factory{} + +// RegisterType registers a new queue type. +func RegisterType(name string, f Factory) { + if queueReg[name] != nil { + panic(fmt.Errorf("queue type '%v' exists already", name)) + } + queueReg[name] = f +} + +// FindFactory retrieves a queue types constructor. Returns nil if queue type is unknown +func FindFactory(name string) Factory { + return queueReg[name] +} + +// Load instantiates a new queue. +func Load(eventer Eventer, config common.ConfigNamespace) (Queue, error) { + t, cfg := config.Name(), config.Config() + if t == "" { + t = "mem" + } + + factory := FindFactory(t) + if factory == nil { + return nil, fmt.Errorf("queue type %v undefined", t) + } + return factory(eventer, cfg) +} diff --git a/libbeat/publisher/queue/queuetest/doc.go b/libbeat/publisher/queue/queuetest/doc.go new file mode 100644 index 00000000000..9ed1958e0e6 --- /dev/null +++ b/libbeat/publisher/queue/queuetest/doc.go @@ -0,0 +1,3 @@ +// Package queuetest provides common functionality tests all queue implementations +// must pass. These tests guarantee a queue fits well into the publisher pipeline. +package queuetest diff --git a/libbeat/publisher/broker/brokertest/event.go b/libbeat/publisher/queue/queuetest/event.go similarity index 94% rename from libbeat/publisher/broker/brokertest/event.go rename to libbeat/publisher/queue/queuetest/event.go index 12830e07e3e..2225927197d 100644 --- a/libbeat/publisher/broker/brokertest/event.go +++ b/libbeat/publisher/queue/queuetest/event.go @@ -1,4 +1,4 @@ -package brokertest +package queuetest import ( "time" diff --git a/libbeat/publisher/broker/brokertest/log.go b/libbeat/publisher/queue/queuetest/log.go similarity index 99% rename from libbeat/publisher/broker/brokertest/log.go rename to libbeat/publisher/queue/queuetest/log.go index 2647e7110a9..00b98e7728b 100644 --- a/libbeat/publisher/broker/brokertest/log.go +++ b/libbeat/publisher/queue/queuetest/log.go @@ -1,4 +1,4 @@ -package brokertest +package queuetest import ( "bufio" diff --git a/libbeat/publisher/broker/brokertest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go similarity index 84% rename from libbeat/publisher/broker/brokertest/producer_cancel.go rename to libbeat/publisher/queue/queuetest/producer_cancel.go index c8db1f5f2d9..4340c12208f 100644 --- a/libbeat/publisher/broker/brokertest/producer_cancel.go +++ b/libbeat/publisher/queue/queuetest/producer_cancel.go @@ -1,4 +1,4 @@ -package brokertest +package queuetest import ( "testing" @@ -7,16 +7,16 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/publisher" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) // TestSingleProducerConsumer tests buffered events for a producer getting // cancelled will not be consumed anymore. Concurrent producer/consumer pairs // might still have active events not yet ACKed (not tested here). // -// Note: brokers not requiring consumers to ACK a events in order to +// Note: queues not requiring consumers to ACK a events in order to // return ACKs to the producer are not supported by this test. -func TestProducerCancelRemovesEvents(t *testing.T, factory BrokerFactory) { +func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { fn := withLogOutput(func(t *testing.T) { var ( i int @@ -29,7 +29,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory BrokerFactory) { defer b.Close() log.Debug("create first producer") - producer := b.Producer(broker.ProducerConfig{ + producer := b.Producer(queue.ProducerConfig{ ACK: func(int) {}, // install function pointer, so 'cancel' will remove events DropOnCancel: true, }) @@ -47,7 +47,7 @@ func TestProducerCancelRemovesEvents(t *testing.T, factory BrokerFactory) { // reconnect and send some more events log.Debug("connect new producer") - producer = b.Producer(broker.ProducerConfig{}) + producer = b.Producer(queue.ProducerConfig{}) for ; i < N2; i++ { log.Debugf("send event %v to new producer", i) producer.Publish(makeEvent(common.MapStr{ diff --git a/libbeat/publisher/broker/brokertest/brokertest.go b/libbeat/publisher/queue/queuetest/queuetest.go similarity index 88% rename from libbeat/publisher/broker/brokertest/brokertest.go rename to libbeat/publisher/queue/queuetest/queuetest.go index 48920e7eda4..38c4b0c635c 100644 --- a/libbeat/publisher/broker/brokertest/brokertest.go +++ b/libbeat/publisher/queue/queuetest/queuetest.go @@ -1,21 +1,21 @@ -package brokertest +package queuetest import ( "sync" "testing" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/publisher/broker" + "github.com/elastic/beats/libbeat/publisher/queue" ) -type BrokerFactory func() broker.Broker +type QueueFactory func() queue.Queue -type workerFactory func(*sync.WaitGroup, interface{}, *TestLogger, broker.Broker) func() +type workerFactory func(*sync.WaitGroup, interface{}, *TestLogger, queue.Queue) func() func TestSingleProducerConsumer( t *testing.T, events, batchSize int, - factory BrokerFactory, + factory QueueFactory, ) { tests := []struct { name string @@ -49,9 +49,9 @@ func TestSingleProducerConsumer( log := NewTestLogger(t) log.Debug("run test: ", test.name) - broker := factory() + queue := factory() defer func() { - err := broker.Close() + err := queue.Close() if err != nil { t.Error(err) } @@ -59,8 +59,8 @@ func TestSingleProducerConsumer( var wg sync.WaitGroup - go test.producers(&wg, nil, log, broker)() - go test.consumers(&wg, nil, log, broker)() + go test.producers(&wg, nil, log, queue)() + go test.consumers(&wg, nil, log, queue)() wg.Wait() })) @@ -71,7 +71,7 @@ func TestSingleProducerConsumer( func TestMultiProducerConsumer( t *testing.T, events, batchSize int, - factory BrokerFactory, + factory QueueFactory, ) { tests := []struct { name string @@ -192,9 +192,9 @@ func TestMultiProducerConsumer( log := NewTestLogger(t) log.Debug("run test: ", test.name) - broker := factory() + queue := factory() defer func() { - err := broker.Close() + err := queue.Close() if err != nil { t.Error(err) } @@ -202,8 +202,8 @@ func TestMultiProducerConsumer( var wg sync.WaitGroup - go test.producers(&wg, nil, log, broker)() - go test.consumers(&wg, nil, log, broker)() + go test.producers(&wg, nil, log, queue)() + go test.consumers(&wg, nil, log, queue)() wg.Wait() })) @@ -213,10 +213,10 @@ func TestMultiProducerConsumer( func multiple( fns ...workerFactory, ) workerFactory { - return func(wg *sync.WaitGroup, info interface{}, log *TestLogger, broker broker.Broker) func() { + return func(wg *sync.WaitGroup, info interface{}, log *TestLogger, queue queue.Queue) func() { runners := make([]func(), len(fns)) for i, gen := range fns { - runners[i] = gen(wg, info, log, broker) + runners[i] = gen(wg, info, log, queue) } return func() { @@ -231,8 +231,8 @@ func makeProducer( maxEvents int, waitACK bool, makeFields func(int) common.MapStr, -) func(*sync.WaitGroup, interface{}, *TestLogger, broker.Broker) func() { - return func(wg *sync.WaitGroup, info interface{}, log *TestLogger, b broker.Broker) func() { +) func(*sync.WaitGroup, interface{}, *TestLogger, queue.Queue) func() { + return func(wg *sync.WaitGroup, info interface{}, log *TestLogger, b queue.Queue) func() { wg.Add(1) return func() { defer wg.Done() @@ -259,7 +259,7 @@ func makeProducer( } } - producer := b.Producer(broker.ProducerConfig{ + producer := b.Producer(queue.ProducerConfig{ ACK: ackCB, }) for i := 0; i < maxEvents; i++ { @@ -276,14 +276,14 @@ func makeConsumer(maxEvents, batchSize int) workerFactory { } func multiConsumer(numConsumers, maxEvents, batchSize int) workerFactory { - return func(wg *sync.WaitGroup, info interface{}, log *TestLogger, b broker.Broker) func() { + return func(wg *sync.WaitGroup, info interface{}, log *TestLogger, b queue.Queue) func() { wg.Add(1) return func() { defer wg.Done() var events sync.WaitGroup - consumers := make([]broker.Consumer, numConsumers) + consumers := make([]queue.Consumer, numConsumers) for i := range consumers { consumers[i] = b.Consumer() } diff --git a/libbeat/publisher/testing/testing.go b/libbeat/publisher/testing/testing.go index 38c808cff39..4228a8ca05f 100644 --- a/libbeat/publisher/testing/testing.go +++ b/libbeat/publisher/testing/testing.go @@ -2,7 +2,6 @@ package testing // ChanClient implements Client interface, forwarding published events to some import ( - "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/libbeat/publisher/beat" ) @@ -16,15 +15,15 @@ type ChanClient struct { Channel chan beat.Event } -func PublisherWithClient(client beat.Client) publisher.Publisher { +func PublisherWithClient(client beat.Client) beat.Pipeline { return &TestPublisher{client} } -func (pub *TestPublisher) Connect() publisher.Client { - panic("Not supported") +func (pub *TestPublisher) Connect() (beat.Client, error) { + return pub.client, nil } -func (pub *TestPublisher) ConnectX(_ beat.ClientConfig) (beat.Client, error) { +func (pub *TestPublisher) ConnectWith(_ beat.ClientConfig) (beat.Client, error) { return pub.client, nil } diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index e9de37243b1..f2bd16c39a6 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -3,14 +3,13 @@ package module import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/libbeat/publisher/beat" ) // Connector configures ann establishes a beat.Client for publishing events // to the publisher pipeline. type Connector struct { - pipeline publisher.Publisher + pipeline beat.Pipeline processors *processors.Processors eventMeta common.EventMetadata } @@ -20,7 +19,7 @@ type connectorConfig struct { common.EventMetadata `config:",inline"` // Fields and tags to add to events. } -func NewConnector(pipeline publisher.Publisher, c *common.Config) (*Connector, error) { +func NewConnector(pipeline beat.Pipeline, c *common.Config) (*Connector, error) { config := connectorConfig{} if err := c.Unpack(&config); err != nil { return nil, err @@ -39,7 +38,7 @@ func NewConnector(pipeline publisher.Publisher, c *common.Config) (*Connector, e } func (c *Connector) Connect() (beat.Client, error) { - return c.pipeline.ConnectX(beat.ClientConfig{ + return c.pipeline.ConnectWith(beat.ClientConfig{ EventMetadata: c.eventMeta, Processor: c.processors, }) diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index b2123bd79d1..01e541a3ab3 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -8,19 +8,19 @@ import ( "github.com/elastic/beats/libbeat/cfgfile" "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/cfgwarn" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" + "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/metricbeat/mb" ) // Factory creates new Runner instances from configuration objects. // It is used to register and reload modules. type Factory struct { - pipeline publisher.Publisher + pipeline beat.Pipeline maxStartDelay time.Duration } // NewFactory creates new Reloader instance for the given config -func NewFactory(maxStartDelay time.Duration, p publisher.Publisher) *Factory { +func NewFactory(maxStartDelay time.Duration, p beat.Pipeline) *Factory { return &Factory{ pipeline: p, maxStartDelay: maxStartDelay, diff --git a/packetbeat/beater/packetbeat.go b/packetbeat/beater/packetbeat.go index 797ab398a6d..42669f12c3f 100644 --- a/packetbeat/beater/packetbeat.go +++ b/packetbeat/beater/packetbeat.go @@ -14,7 +14,6 @@ import ( "github.com/elastic/beats/libbeat/common/droppriv" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" pub "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/libbeat/service" "github.com/elastic/beats/packetbeat/config" @@ -36,7 +35,7 @@ type packetbeat struct { sniff *sniffer.SnifferSetup // publisher/pipeline - pipeline publisher.Publisher + pipeline pub.Pipeline transPub *publish.TransactionPublisher services []interface { @@ -104,6 +103,7 @@ func (pb *packetbeat) init(b *beat.Beat) error { pb.pipeline = b.Publisher pb.transPub, err = publish.NewTransactionPublisher( + b.Info.Name, b.Publisher, pb.config.IgnoreOutgoing, pb.config.Interfaces.File == "", @@ -220,7 +220,7 @@ func (pb *packetbeat) createWorker(dl layers.LinkType) (sniffer.Worker, error) { return nil, err } - client, err := pb.pipeline.ConnectX(pub.ClientConfig{ + client, err := pb.pipeline.ConnectWith(pub.ClientConfig{ EventMetadata: config.Flows.EventMetadata, Processor: processors, }) diff --git a/packetbeat/publish/publish.go b/packetbeat/publish/publish.go index f06aeff7c29..024c5e089b1 100644 --- a/packetbeat/publish/publish.go +++ b/packetbeat/publish/publish.go @@ -6,13 +6,12 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/libbeat/publisher/beat" ) type TransactionPublisher struct { done chan struct{} - pipeline publisher.Publisher + pipeline beat.Pipeline canDrop bool processor transProcessor } @@ -26,7 +25,8 @@ type transProcessor struct { var debugf = logp.MakeDebug("publish") func NewTransactionPublisher( - pipeline publisher.Publisher, + name string, + pipeline beat.Pipeline, ignoreOutgoing bool, canDrop bool, ) (*TransactionPublisher, error) { @@ -35,7 +35,6 @@ func NewTransactionPublisher( return nil, err } - name := pipeline.(*publisher.BeatPublisher).GetName() p := &TransactionPublisher{ done: make(chan struct{}), pipeline: pipeline, @@ -79,7 +78,7 @@ func (p *TransactionPublisher) CreateReporter( clientConfig.PublishMode = beat.DropIfFull } - client, err := p.pipeline.ConnectX(clientConfig) + client, err := p.pipeline.ConnectWith(clientConfig) if err != nil { return nil, err } diff --git a/packetbeat/publish/publish_test.go b/packetbeat/publish/publish_test.go index 19fed683089..a91d098aee0 100644 --- a/packetbeat/publish/publish_test.go +++ b/packetbeat/publish/publish_test.go @@ -9,7 +9,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/libbeat/publisher/beat" ) @@ -142,11 +141,6 @@ func TestDirectionIn(t *testing.T) { assert.True(t, event.Fields["direction"] == "in") } -func newTestPublisher() *publisher.BeatPublisher { - p := &publisher.BeatPublisher{} - return p -} - func TestNoDirection(t *testing.T) { processor := transProcessor{ localIPs: []string{"192.145.2.6"}, diff --git a/winlogbeat/beater/eventlogger.go b/winlogbeat/beater/eventlogger.go index e68068a88ee..5c53d5f7065 100644 --- a/winlogbeat/beater/eventlogger.go +++ b/winlogbeat/beater/eventlogger.go @@ -6,7 +6,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/processors" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/winlogbeat/checkpoint" "github.com/elastic/beats/winlogbeat/eventlog" @@ -44,9 +43,9 @@ func newEventLogger( }, nil } -func (e *eventLogger) connect(pipeline publisher.Publisher) (beat.Client, error) { +func (e *eventLogger) connect(pipeline beat.Pipeline) (beat.Client, error) { api := e.source.Name() - return pipeline.ConnectX(beat.ClientConfig{ + return pipeline.ConnectWith(beat.ClientConfig{ PublishMode: beat.GuaranteedSend, EventMetadata: e.eventMeta, Meta: nil, // TODO: configure modules/ES ingest pipeline? @@ -60,7 +59,7 @@ func (e *eventLogger) connect(pipeline publisher.Publisher) (beat.Client, error) func (e *eventLogger) run( done <-chan struct{}, - pipeline publisher.Publisher, + pipeline beat.Pipeline, state checkpoint.EventLogState, ) { api := e.source diff --git a/winlogbeat/beater/winlogbeat.go b/winlogbeat/beater/winlogbeat.go index 8ab5f7abd06..32c9ba95e08 100644 --- a/winlogbeat/beater/winlogbeat.go +++ b/winlogbeat/beater/winlogbeat.go @@ -14,7 +14,6 @@ import ( "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/paths" - "github.com/elastic/beats/libbeat/publisher/bc/publisher" pub "github.com/elastic/beats/libbeat/publisher/beat" "github.com/elastic/beats/winlogbeat/checkpoint" @@ -40,7 +39,7 @@ type Winlogbeat struct { config *config.Settings // Configuration settings. eventLogs []*eventLogger // List of all event logs being monitored. done chan struct{} // Channel to initiate shutdown of main event loop. - pipeline publisher.Publisher // Interface to publish event. + pipeline pub.Pipeline // Interface to publish event. checkpoint *checkpoint.Checkpoint // Persists event log state to disk. }