Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline cleanups #4776

Merged
merged 8 commits into from
Jul 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions filebeat/channel/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package channel
import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
)

type OutletFactory struct {
done <-chan struct{}
pipeline publisher.Publisher
pipeline beat.Pipeline

eventer beat.ClientEventer
wgEvents eventCounter
Expand Down Expand Up @@ -48,7 +47,7 @@ type prospectorOutletConfig struct {
// connecting a prospector to the publisher pipeline.
func NewOutletFactory(
done <-chan struct{},
pipeline publisher.Publisher,
pipeline beat.Pipeline,
wgEvents eventCounter,
) *OutletFactory {
o := &OutletFactory{
Expand Down Expand Up @@ -102,7 +101,7 @@ func (f *OutletFactory) Create(cfg *common.Config) (Outleter, error) {
}
}

client, err := f.pipeline.ConnectX(beat.ClientConfig{
client, err := f.pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
EventMetadata: config.EventMetadata,
Meta: meta,
Expand Down
23 changes: 15 additions & 8 deletions generator/beat/{beat}/beater/{beat}.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
pub "github.com/elastic/beats/libbeat/publisher/beat"

"{beat_path}/config"
)

type {Beat} struct {
done chan struct{}
config config.Config
client publisher.Client
client pub.Client
}

// Creates beater
Expand All @@ -35,7 +35,12 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
func (bt *{Beat}) Run(b *beat.Beat) error {
logp.Info("{beat} is running! Hit CTRL-C to stop it.")

bt.client = b.Publisher.Connect()
var err error
bt.client, err = b.Publisher.Connect()
if err != nil {
return err
}

ticker := time.NewTicker(bt.config.Period)
counter := 1
for {
Expand All @@ -45,12 +50,14 @@ func (bt *{Beat}) Run(b *beat.Beat) error {
case <-ticker.C:
}

event := common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": b.Info.Name,
"counter": counter,
event := pub.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": b.Info.Name,
"counter": counter,
},
}
bt.client.PublishEvent(event)
bt.client.Publish(event)
logp.Info("Event sent")
counter++
}
Expand Down
7 changes: 3 additions & 4 deletions heartbeat/beater/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"

"github.com/elastic/beats/heartbeat/monitors"
Expand All @@ -35,7 +34,7 @@ type monitor struct {

active map[string]monitorTask

pipeline publisher.Publisher
pipeline beat.Pipeline
}

type monitorTask struct {
Expand Down Expand Up @@ -66,7 +65,7 @@ var defaultFilePollInterval = 5 * time.Second
const defaultEventType = "monitor"

func newMonitorManager(
pipeline publisher.Publisher,
pipeline beat.Pipeline,
jobControl jobControl,
registry *monitors.Registrar,
configs []*common.Config,
Expand Down Expand Up @@ -207,7 +206,7 @@ func (m *monitor) Update(configs []*common.Config) error {
}

// create connection per monitorTask
client, err := m.pipeline.ConnectX(beat.ClientConfig{
client, err := m.pipeline.ConnectWith(beat.ClientConfig{
EventMetadata: t.config.EventMetadata,
Processor: processors,
})
Expand Down
63 changes: 32 additions & 31 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ import (
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
"github.com/elastic/beats/libbeat/publisher/pipeline"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/template"
"github.com/elastic/beats/libbeat/version"
Expand Down Expand Up @@ -108,27 +108,35 @@ type SetupMLCallback func(*Beat) error
// Beat contains the basic beat data and the publisher client used to publish
// events.
type Beat struct {
Info common.BeatInfo // beat metadata.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher publisher.Publisher // Publisher
Info common.BeatInfo // beat metadata.
RawConfig *common.Config // Raw config that can be unpacked to get Beat specific config data.
Config BeatConfig // Common Beat configuration data.
Publisher beat.Pipeline // Publisher pipeline

SetupMLCallback SetupMLCallback // setup callback for ML job configs
InSetupCmd bool // this is set to true when the `setup` command is called
}

// BeatConfig struct contains the basic configuration of every beat
type BeatConfig struct {
Shipper publisher.ShipperConfig `config:",inline"`
Output common.ConfigNamespace `config:"output"`
Monitoring *common.Config `config:"xpack.monitoring"`
Logging logp.Logging `config:"logging"`
Processors processors.PluginConfig `config:"processors"`
Path paths.Path `config:"path"`
Dashboards *common.Config `config:"setup.dashboards"`
Template *common.Config `config:"setup.template"`
Kibana *common.Config `config:"setup.kibana"`
Http *common.Config `config:"http"`
// beat top-level settings
Name string `config:"name"`
MaxProcs int `config:"max_procs"`

// beat internal components configurations
HTTP *common.Config `config:"http"`
Path paths.Path `config:"path"`
Logging logp.Logging `config:"logging"`

// output/publishing related configurations
Pipeline pipeline.Config `config:",inline"`
Monitoring *common.Config `config:"xpack.monitoring"`
Output common.ConfigNamespace `config:"output"`

// 'setup' configurations
Dashboards *common.Config `config:"setup.dashboards"`
Template *common.Config `config:"setup.template"`
Kibana *common.Config `config:"setup.kibana"`
}

var (
Expand Down Expand Up @@ -238,27 +246,23 @@ func (b *Beat) createBeater(bt Creator) (Beater, error) {
}

logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version)
processors, err := processors.New(b.Config.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}

err = b.registerTemplateLoading()
if err != nil {
return nil, err
}

debugf("Initializing output plugins")
publisher, err := publisher.New(b.Info, b.Config.Output, b.Config.Shipper, processors)
pipeline, err := pipeline.Load(b.Info, b.Config.Pipeline, b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %v", err)
}

// TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet,
// but refine publisher to disconnect clients on stop automatically
// defer publisher.Stop()
// defer pipeline.Close()

b.Publisher = publisher
b.Publisher = pipeline
beater, err := bt(b, sub)
if err != nil {
return nil, err
Expand Down Expand Up @@ -317,8 +321,8 @@ func (b *Beat) launch(bt Creator) error {
defer logp.Info("%s stopped.", b.Info.Beat)
defer logp.LogTotalExpvars(&b.Config.Logging)

if b.Config.Http.Enabled() {
api.Start(b.Config.Http, b.Info)
if b.Config.HTTP.Enabled() {
api.Start(b.Config.HTTP, b.Info)
}

return beater.Run(b)
Expand Down Expand Up @@ -458,7 +462,7 @@ func (b *Beat) configure() error {
return err
}

if name := b.Config.Shipper.Name; name != "" {
if name := b.Config.Name; name != "" {
b.Info.Name = name
}

Expand All @@ -482,11 +486,8 @@ func (b *Beat) configure() error {

logp.Info("Beat UUID: %v", b.Info.UUID)

if b.Config.Shipper.MaxProcs != nil {
maxProcs := *b.Config.Shipper.MaxProcs
if maxProcs > 0 {
runtime.GOMAXPROCS(maxProcs)
}
if maxProcs := b.Config.MaxProcs; maxProcs > 0 {
runtime.GOMAXPROCS(maxProcs)
}

return nil
Expand Down
21 changes: 14 additions & 7 deletions libbeat/mock/mockbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
pub "github.com/elastic/beats/libbeat/publisher/beat"
)

///*** Mock Beat Setup ***///
Expand All @@ -16,7 +16,7 @@ var Name = "mockbeat"

type Mockbeat struct {
done chan struct{}
client publisher.Client
client pub.Client
}

// Creates beater
Expand All @@ -29,13 +29,20 @@ func New(b *beat.Beat, _ *common.Config) (beat.Beater, error) {
/// *** Beater interface methods ***///

func (mb *Mockbeat) Run(b *beat.Beat) error {
mb.client = b.Publisher.Connect()
var err error

mb.client, err = b.Publisher.Connect()
if err != nil {
return err
}

// Wait until mockbeat is done
mb.client.PublishEvent(common.MapStr{
"@timestamp": common.Time(time.Now()),
"type": "mock",
"message": "Mockbeat is alive!",
mb.client.Publish(pub.Event{
Timestamp: time.Now(),
Fields: common.MapStr{
"type": "mock",
"message": "Mockbeat is alive!",
},
})
<-mb.done
return nil
Expand Down
10 changes: 5 additions & 5 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ import (
"github.com/elastic/beats/libbeat/outputs/outil"
"github.com/elastic/beats/libbeat/outputs/transport"
"github.com/elastic/beats/libbeat/publisher/beat"
"github.com/elastic/beats/libbeat/publisher/broker"
"github.com/elastic/beats/libbeat/publisher/broker/membroker"
"github.com/elastic/beats/libbeat/publisher/pipeline"
"github.com/elastic/beats/libbeat/publisher/queue"
"github.com/elastic/beats/libbeat/publisher/queue/memqueue"
)

type reporter struct {
Expand Down Expand Up @@ -104,8 +104,8 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er
out.Clients = append(out.Clients, client)
}

brokerFactory := func(e broker.Eventer) (broker.Broker, error) {
return membroker.NewBroker(membroker.Settings{
queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return memqueue.NewBroker(memqueue.Settings{
Eventer: e,
Events: 20,
}), nil
Expand All @@ -115,7 +115,7 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er

pipeline, err := pipeline.New(
monitoring,
brokerFactory, out, pipeline.Settings{
queueFactory, out, pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
})
Expand Down
Loading