Skip to content

Commit

Permalink
Pipeline cleanups (elastic#4776)
Browse files Browse the repository at this point in the history
* Rename publisher/broker package to publisher/queue
* Move global pipeline loading to the pipeline package
  • Loading branch information
Steffen Siering authored and tsg committed Jul 31, 2017
1 parent ca658e6 commit c79b09f
Show file tree
Hide file tree
Showing 50 changed files with 418 additions and 1,079 deletions.
7 changes: 3 additions & 4 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package channel
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
)

type OutletFactory struct {
done <-chan struct{}
pipeline publisher.Publisher
pipeline beat.Pipeline

eventer beat.ClientEventer
wgEvents eventCounter
Expand Down Expand Up @@ -48,7 +47,7 @@ type prospectorOutletConfig struct {
// connecting a prospector to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
pipeline publisher.Publisher,
pipeline beat.Pipeline,
wgEvents eventCounter,
) *OutletFactory {
o := &OutletFactory{
Expand Down Expand Up @@ -102,7 +101,7 @@ func (f *OutletFactory) Create(cfg *common.Config) (Outleter, error) {
}
}

client, err := f.pipeline.ConnectX(beat.ClientConfig{
client, err := f.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
Meta: meta,
Expand Down
23 changes: 15 additions & 8 deletions generator/beat/{beat}/beater/{beat}.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
pub "github.com/elastic/beats/libbeat/publisher/beat"

"{beat_path}/config"
)

type {Beat} struct {
done chan struct{}
config config.Config
client publisher.Client
client pub.Client
}

// Creates beater
Expand All @@ -35,7 +35,12 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
func (bt *{Beat}) Run(b *beat.Beat) error {
logp.Info("{beat} is running! Hit CTRL-C to stop it.")

bt.client = b.Publisher.Connect()
var err error
bt.client, err = b.Publisher.Connect()
if err != nil {
return err
}

ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
Expand All @@ -45,12 +50,14 @@ func (bt *{Beat}) Run(b *beat.Beat) error {
case <-ticker.C:
}

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": b.Info.Name,
"counter": counter,
event := pub.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": b.Info.Name,
"counter": counter,
},
}
bt.client.PublishEvent(event)
bt.client.Publish(event)
logp.Info("Event sent")
counter++
}
Expand Down
7 changes: 3 additions & 4 deletions heartbeat/beater/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"

"github.com/elastic/beats/heartbeat/monitors"
Expand All @@ -35,7 +34,7 @@ type monitor struct {

active map[string]monitorTask

pipeline publisher.Publisher
pipeline beat.Pipeline
}

type monitorTask struct {
Expand Down Expand Up @@ -66,7 +65,7 @@ var defaultFilePollInterval = 5 * time.Second
const defaultEventType = "monitor"

func newMonitorManager(
pipeline publisher.Publisher,
pipeline beat.Pipeline,
jobControl jobControl,
registry *monitors.Registrar,
configs []*common.Config,
Expand Down Expand Up @@ -207,7 +206,7 @@ func (m *monitor) Update(configs []*common.Config) error {
}

// create connection per monitorTask
client, err := m.pipeline.ConnectX(beat.ClientConfig{
client, err := m.pipeline.ConnectWith(beat.ClientConfig{
EventMetadata: t.config.EventMetadata,
Processor: processors,
})
Expand Down
63 changes: 32 additions & 31 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ import (
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
"github.com/elastic/beats/libbeat/publisher/pipeline"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/template"
"github.com/elastic/beats/libbeat/version"
Expand Down Expand Up @@ -108,27 +108,35 @@ type SetupMLCallback func(*Beat) error
// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Info common.BeatInfo // beat metadata.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher publisher.Publisher // Publisher
Info common.BeatInfo // beat metadata.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher beat.Pipeline // Publisher pipeline

SetupMLCallback SetupMLCallback // setup callback for ML job configs
InSetupCmd bool // this is set to true when the `setup` command is called
}

// BeatConfig struct contains the basic configuration of every beat
type BeatConfig struct {
Shipper publisher.ShipperConfig `config:",inline"`
Output common.ConfigNamespace `config:"output"`
Monitoring *common.Config `config:"xpack.monitoring"`
Logging logp.Logging `config:"logging"`
Processors processors.PluginConfig `config:"processors"`
Path paths.Path `config:"path"`
Dashboards *common.Config `config:"setup.dashboards"`
Template *common.Config `config:"setup.template"`
Kibana *common.Config `config:"setup.kibana"`
Http *common.Config `config:"http"`
// beat top-level settings
Name string `config:"name"`
MaxProcs int `config:"max_procs"`

// beat internal components configurations
HTTP *common.Config `config:"http"`
Path paths.Path `config:"path"`
Logging logp.Logging `config:"logging"`

// output/publishing related configurations
Pipeline pipeline.Config `config:",inline"`
Monitoring *common.Config `config:"xpack.monitoring"`
Output common.ConfigNamespace `config:"output"`

// 'setup' configurations
Dashboards *common.Config `config:"setup.dashboards"`
Template *common.Config `config:"setup.template"`
Kibana *common.Config `config:"setup.kibana"`
}

var (
Expand Down Expand Up @@ -238,27 +246,23 @@ func (b *Beat) createBeater(bt Creator) (Beater, error) {
}

logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version)
processors, err := processors.New(b.Config.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}

err = b.registerTemplateLoading()
if err != nil {
return nil, err
}

debugf("Initializing output plugins")
publisher, err := publisher.New(b.Info, b.Config.Output, b.Config.Shipper, processors)
pipeline, err := pipeline.Load(b.Info, b.Config.Pipeline, b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %v", err)
}

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer publisher.Stop()
// defer pipeline.Close()

b.Publisher = publisher
b.Publisher = pipeline
beater, err := bt(b, sub)
if err != nil {
return nil, err
Expand Down Expand Up @@ -317,8 +321,8 @@ func (b *Beat) launch(bt Creator) error {
defer logp.Info("%s stopped.", b.Info.Beat)
defer logp.LogTotalExpvars(&b.Config.Logging)

if b.Config.Http.Enabled() {
api.Start(b.Config.Http, b.Info)
if b.Config.HTTP.Enabled() {
api.Start(b.Config.HTTP, b.Info)
}

return beater.Run(b)
Expand Down Expand Up @@ -458,7 +462,7 @@ func (b *Beat) configure() error {
return err
}

if name := b.Config.Shipper.Name; name != "" {
if name := b.Config.Name; name != "" {
b.Info.Name = name
}

Expand All @@ -482,11 +486,8 @@ func (b *Beat) configure() error {

logp.Info("Beat UUID: %v", b.Info.UUID)

if b.Config.Shipper.MaxProcs != nil {
maxProcs := *b.Config.Shipper.MaxProcs
if maxProcs > 0 {
runtime.GOMAXPROCS(maxProcs)
}
if maxProcs := b.Config.MaxProcs; maxProcs > 0 {
runtime.GOMAXPROCS(maxProcs)
}

return nil
Expand Down
21 changes: 14 additions & 7 deletions libbeat/mock/mockbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
pub "github.com/elastic/beats/libbeat/publisher/beat"
)

///*** Mock Beat Setup ***///
Expand All @@ -16,7 +16,7 @@ var Name = "mockbeat"

type Mockbeat struct {
done chan struct{}
client publisher.Client
client pub.Client
}

// Creates beater
Expand All @@ -29,13 +29,20 @@ func New(b *beat.Beat, _ *common.Config) (beat.Beater, error) {
/// *** Beater interface methods ***///

func (mb *Mockbeat) Run(b *beat.Beat) error {
mb.client = b.Publisher.Connect()
var err error

mb.client, err = b.Publisher.Connect()
if err != nil {
return err
}

// Wait until mockbeat is done
mb.client.PublishEvent(common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": "mock",
"message": "Mockbeat is alive!",
mb.client.Publish(pub.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": "mock",
"message": "Mockbeat is alive!",
},
})
<-mb.done
return nil
Expand Down
10 changes: 5 additions & 5 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/libbeat/publisher/beat"
"github.com/elastic/beats/libbeat/publisher/broker"
"github.com/elastic/beats/libbeat/publisher/broker/membroker"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/queue"
"github.com/elastic/beats/libbeat/publisher/queue/memqueue"
)

type reporter struct {
Expand Down Expand Up @@ -104,8 +104,8 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er
out.Clients = append(out.Clients, client)
}

brokerFactory := func(e broker.Eventer) (broker.Broker, error) {
return membroker.NewBroker(membroker.Settings{
queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return memqueue.NewBroker(memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
Expand All @@ -115,7 +115,7 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er

pipeline, err := pipeline.New(
monitoring,
brokerFactory, out, pipeline.Settings{
queueFactory, out, pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
})
Expand Down
Loading

0 comments on commit c79b09f

Please sign in to comment.