Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move filebeat to new publisher pipeline #4644

Merged
merged 2 commits into from
Jul 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions filebeat/beater/acker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package beater

import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/publisher/beat"
)

// eventAcker handles publisher pipeline ACKs and forwards
// them to the registrar.
type eventACKer struct {
out successLogger
}

type successLogger interface {
Published(events []*util.Data) bool
}

func newEventACKer(out successLogger) *eventACKer {
return &eventACKer{out: out}
}

func (a *eventACKer) ackEvents(events []beat.Event) {
data := make([]*util.Data, 0, len(events))
for _, event := range events {
p := event.Private
if p == nil {
continue
}

datum, ok := p.(*util.Data)
if !ok || !datum.HasState() {
continue
}

data = append(data, datum)
}

if len(data) > 0 {
a.out.Published(data)
}
}
72 changes: 0 additions & 72 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,11 @@ package beater

import (
"sync"
"sync/atomic"

"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"
"github.com/elastic/beats/filebeat/util"
)

type spoolerOutlet struct {
wg *sync.WaitGroup
done <-chan struct{}
spooler *spooler.Spooler

isOpen int32 // atomic indicator
}

type publisherChannel struct {
done chan struct{}
ch chan []*util.Data
}

type registrarLogger struct {
done chan struct{}
ch chan<- []*util.Data
Expand All @@ -31,63 +16,6 @@ type finishedLogger struct {
wg *sync.WaitGroup
}

func newSpoolerOutlet(
done <-chan struct{},
s *spooler.Spooler,
wg *sync.WaitGroup,
) *spoolerOutlet {
return &spoolerOutlet{
done: done,
spooler: s,
wg: wg,
isOpen: 1,
}
}

func (o *spoolerOutlet) OnEvent(data *util.Data) bool {
open := atomic.LoadInt32(&o.isOpen) == 1
if !open {
return false
}

if o.wg != nil {
o.wg.Add(1)
}

select {
case <-o.done:
if o.wg != nil {
o.wg.Done()
}
atomic.StoreInt32(&o.isOpen, 0)
return false
case o.spooler.Channel <- data:
return true
}
}

func newPublisherChannel() *publisherChannel {
return &publisherChannel{
done: make(chan struct{}),
ch: make(chan []*util.Data, 1),
}
}

func (c *publisherChannel) Close() { close(c.done) }
func (c *publisherChannel) Send(events []*util.Data) bool {
select {
case <-c.done:
// set ch to nil, so no more events will be send after channel close signal
// has been processed the first time.
// Note: nil channels will block, so only done channel will be actively
// report 'closed'.
c.ch = nil
return false
case c.ch <- events:
return true
}
}

func newRegistrarLogger(reg *registrar.Registrar) *registrarLogger {
return &registrarLogger{
done: make(chan struct{}),
Expand Down
45 changes: 16 additions & 29 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,13 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
pub "github.com/elastic/beats/libbeat/publisher/beat"

"github.com/elastic/beats/filebeat/channel"
cfg "github.com/elastic/beats/filebeat/config"
"github.com/elastic/beats/filebeat/crawler"
"github.com/elastic/beats/filebeat/fileset"
"github.com/elastic/beats/filebeat/publisher"
"github.com/elastic/beats/filebeat/registrar"
"github.com/elastic/beats/filebeat/spooler"

// Add filebeat level processors
_ "github.com/elastic/beats/filebeat/processor/add_kubernetes_metadata"
Expand Down Expand Up @@ -165,21 +164,21 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
// Make sure all events that were published in
registrarChannel := newRegistrarLogger(registrar)

// Channel from spooler to harvester
publisherChan := newPublisherChannel()

// Publishes event to output
publisher := publisher.New(config.PublishAsync, publisherChan.ch, registrarChannel, b.Publisher)

// Init and Start spooler: Harvesters dump events into the spooler.
spooler, err := spooler.New(config, publisherChan)
err = b.Publisher.SetACKHandler(pub.PipelineACKHandler{
ACKEvents: newEventACKer(registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Could not init spooler: %v", err)
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
return err
}

outlet := channel.NewOutlet(fb.done, spooler.Channel, wgEvents)
crawler, err := crawler.New(outlet, config.Prospectors, b.Info.Version, fb.done, *once)
outDone := make(chan struct{}) // outDone closes down all active pipeline connections
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
config.Prospectors,
b.Info.Version,
fb.done,
*once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
Expand All @@ -194,32 +193,20 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
if err != nil {
return fmt.Errorf("Could not start registrar: %v", err)
}

// Stopping registrar will write last state
defer registrar.Stop()

// Start publisher
publisher.Start()
// Stopping publisher (might potentially drop items)
defer func() {
// Closes first the registrar logger to make sure not more events arrive at the registrar
// registrarChannel must be closed first to potentially unblock (pretty unlikely) the publisher
registrarChannel.Close()
publisher.Stop()
close(outDone) // finally close all active connections to publisher pipeline
}()

// Starting spooler
spooler.Start()

// Stopping spooler will flush items
defer func() {
// Wait for all events to be processed or timeout
waitEvents.Wait()

// Closes publisher so no further events can be sent
publisherChan.Close()
// Stopping spooler
spooler.Stop()
}()
// Wait for all events to be processed or timeout
defer waitEvents.Wait()

// Create a ES connection factory for dynamic modules pipeline loading
var pipelineLoaderFactory fileset.PipelineLoaderFactory
Expand Down
128 changes: 128 additions & 0 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package channel

import (
"sync"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
)

type OutletFactory struct {
done <-chan struct{}
pipeline publisher.Publisher

eventer beat.ClientEventer
wgEvents *sync.WaitGroup
}

// clientEventer adjusts wgEvents if events are dropped during shutdown.
type clientEventer struct {
wgEvents *sync.WaitGroup
}

// prospectorOutletConfig defines common prospector settings
// for the publisher pipline.
type prospectorOutletConfig struct {
// event processing
common.EventMetadata `config:",inline"` // Fields and tags to add to events.
Processors processors.PluginConfig `config:"processors"`

// implicit event fields
Type string `config:"type"` // prospector.type

// hidden filebeat modules settings
Module string `config:"_module_name"` // hidden setting
Fileset string `config:"_fileset_name"` // hidden setting

// Output meta data settings
Pipeline string `config:"pipeline"` // ES Ingest pipeline name

}

// NewOutletFactory creates a new outlet factory for
// connecting a prospector to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
pipeline publisher.Publisher,
wgEvents *sync.WaitGroup,
) *OutletFactory {
o := &OutletFactory{
done: done,
pipeline: pipeline,
wgEvents: wgEvents,
}

if wgEvents != nil {
o.eventer = &clientEventer{wgEvents}
}

return o
}

// Create builds a new Outleter, while applying common prospector settings.
// Prospectors and all harvesters use the same pipeline client instance.
// This guarantees ordering between events as required by the registrar for
// file.State updates
func (f *OutletFactory) Create(cfg *common.Config) (Outleter, error) {
config := prospectorOutletConfig{}
if err := cfg.Unpack(&config); err != nil {
return nil, err
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}

setMeta := func(to common.MapStr, key, value string) {
if value != "" {
to[key] = value
}
}

meta := common.MapStr{}
setMeta(meta, "pipeline", config.Pipeline)

fields := common.MapStr{}
setMeta(fields, "module", config.Module)
setMeta(fields, "name", config.Fileset)
if len(fields) > 0 {
fields = common.MapStr{
"fileset": fields,
}
}
if config.Type != "" {
fields["prospector"] = common.MapStr{
"type": config.Type,
}
}

client, err := f.pipeline.ConnectX(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
Meta: meta,
Fields: fields,
Processor: processors,
Events: f.eventer,
})
if err != nil {
return nil, err
}

outlet := newOutlet(client, f.wgEvents)
if f.done != nil {
return CloseOnSignal(outlet, f.done), nil
}
return outlet, nil
}

func (*clientEventer) Closing() {}
func (*clientEventer) Closed() {}
func (*clientEventer) Published() {}

func (c *clientEventer) FilteredOut(_ beat.Event) {}
func (c *clientEventer) DroppedOnPublish(_ beat.Event) {
c.wgEvents.Done()
}
12 changes: 8 additions & 4 deletions filebeat/channel/interface.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package channel

import "github.com/elastic/beats/filebeat/util"
import (
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/common"
)

// OutletFactory is used to create a new Outlet instance
type OutleterFactory func(*common.Config) (Outleter, error)

// Outleter is the outlet for a prospector
type Outleter interface {
SetSignal(signal <-chan struct{})
OnEventSignal(data *util.Data) bool
Close() error
OnEvent(data *util.Data) bool
Copy() Outleter
}
Loading