Skip to content

Commit

Permalink
Move global pipeline loading to pipeline module
Browse files Browse the repository at this point in the history
  • Loading branch information
urso committed Jul 28, 2017
1 parent ce78d49 commit 1f4ef9a
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 185 deletions.
10 changes: 5 additions & 5 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ import (
"github.com/elastic/beats/libbeat/outputs/elasticsearch"
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
"github.com/elastic/beats/libbeat/publisher/beat"
"github.com/elastic/beats/libbeat/publisher/pipeline"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/template"
"github.com/elastic/beats/libbeat/version"
Expand Down Expand Up @@ -129,9 +129,9 @@ type BeatConfig struct {
Logging logp.Logging `config:"logging"`

// output/publishing related configurations
Shipper publisher.ShipperConfig `config:",inline"`
Monitoring *common.Config `config:"xpack.monitoring"`
Output common.ConfigNamespace `config:"output"`
Pipeline pipeline.Config `config:",inline"`
Monitoring *common.Config `config:"xpack.monitoring"`
Output common.ConfigNamespace `config:"output"`

// 'setup' configurations
Dashboards *common.Config `config:"setup.dashboards"`
Expand Down Expand Up @@ -253,7 +253,7 @@ func (b *Beat) createBeater(bt Creator) (Beater, error) {
}

debugf("Initializing output plugins")
pipeline, err := publisher.New(b.Info, b.Config.Output, b.Config.Shipper)
pipeline, err := pipeline.Load(b.Info, b.Config.Pipeline, b.Config.Output)
if err != nil {
return nil, fmt.Errorf("error initializing publisher: %v", err)
}
Expand Down
94 changes: 0 additions & 94 deletions libbeat/publisher/bc/publisher/pipeline.go

This file was deleted.

47 changes: 0 additions & 47 deletions libbeat/publisher/bc/publisher/publish.go

This file was deleted.

11 changes: 7 additions & 4 deletions libbeat/publisher/pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ package pipeline
import (
"errors"
"fmt"
"time"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/beat"
)

// Config object for loading a pipeline instance via Load.
type Config struct {
WaitShutdown time.Duration `config:"wait_shutdown"`
Queue common.ConfigNamespace `config:"queue"`
Output common.ConfigNamespace `config:"output"`
// Event processing configurations
common.EventMetadata `config:",inline"` // Fields and tags to add to each event.
Processors processors.PluginConfig `config:"processors"`

// Event queue
Queue common.ConfigNamespace `config:"queue"`
}

// validateClientConfig checks a ClientConfig can be used with (*Pipeline).ConnectWith.
Expand Down
131 changes: 131 additions & 0 deletions libbeat/publisher/pipeline/module.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package pipeline

import (
"errors"
"flag"
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/queue"
)

// Global pipeline module for loading the main pipeline from a configuration object

// command line flags
var publishDisabled = false

const defaultQueueType = "mem"

func init() {
flag.BoolVar(&publishDisabled, "N", false, "Disable actual publishing for testing")
}

// Load uses a Config object to create a new complete Pipeline instance with
// configured queue and outputs.
func Load(
beatInfo common.BeatInfo,
config Config,
outcfg common.ConfigNamespace,
) (*Pipeline, error) {
if publishDisabled {
logp.Info("Dry run mode. All output types except the file based one are disabled.")
}

processors, err := processors.New(config.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}

reg := monitoring.Default.GetRegistry("libbeat")
if reg == nil {
reg = monitoring.Default.NewRegistry("libbeat")
}

name := beatInfo.Name
settings := Settings{
WaitClose: 0,
WaitCloseMode: NoWaitOnClose,
Disabled: publishDisabled,
Processors: processors,
Annotations: Annotations{
Event: config.EventMetadata,
Beat: common.MapStr{
"name": name,
"hostname": beatInfo.Hostname,
"version": beatInfo.Version,
},
},
}

queueBuilder, err := createQueueBuilder(config.Queue)
if err != nil {
return nil, err
}

out, err := loadOutput(beatInfo, reg, outcfg)
if err != nil {
return nil, err
}

p, err := New(reg, queueBuilder, out, settings)
if err != nil {
return nil, err
}

logp.Info("Publisher name: %s", name)
return p, err
}

func loadOutput(
beatInfo common.BeatInfo,
reg *monitoring.Registry,
outcfg common.ConfigNamespace,
) (outputs.Group, error) {
if publishDisabled {
return outputs.Group{}, nil
}

if !outcfg.IsSet() {
msg := "No outputs are defined. Please define one under the output section."
logp.Info(msg)
return outputs.Fail(errors.New(msg))
}

// TODO: add support to unload/reassign outStats on output reloading
outReg := reg.NewRegistry("output")
outStats := outputs.MakeStats(outReg)

out, err := outputs.Load(beatInfo, &outStats, outcfg.Name(), outcfg.Config())
if err != nil {
return outputs.Fail(err)
}

monitoring.NewString(outReg, "type").Set(outcfg.Name())

return out, nil
}

func createQueueBuilder(config common.ConfigNamespace) (func(queue.Eventer) (queue.Queue, error), error) {
queueType := defaultQueueType
if b := config.Name(); b != "" {
queueType = b
}

queueFactory := queue.FindFactory(queueType)
if queueFactory == nil {
return nil, fmt.Errorf("'%v' is no valid queue type", queueType)
}

queueConfig := config.Config()
if queueConfig == nil {
queueConfig = common.NewConfig()
}

return func(eventer queue.Eventer) (queue.Queue, error) {
return queueFactory(eventer, queueConfig)
}, nil
}
35 changes: 0 additions & 35 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,41 +124,6 @@ type waitCloser struct {

type queueFactory func(queue.Eventer) (queue.Queue, error)

// Load uses a Config object to create a new complete Pipeline instance with
// configured queue and outputs.
func Load(
beatInfo common.BeatInfo,
monitoring *monitoring.Registry,
config Config,
) (*Pipeline, error) {
if !config.Output.IsSet() {
return nil, errors.New("no output configured")
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
return queue.Load(e, config.Queue)
}

output, err := outputs.Load(beatInfo, nil, config.Output.Name(), config.Output.Config())
if err != nil {
return nil, err
}

// TODO: configure pipeline processors
pipeline, err := New(monitoring, queueFactory, output, Settings{
WaitClose: config.WaitShutdown,
WaitCloseMode: WaitOnPipelineClose,
})
if err != nil {
for _, c := range output.Clients {
c.Close()
}
return nil, err
}

return pipeline, nil
}

// New create a new Pipeline instance from a queue instance and a set of outputs.
// The new pipeline will take ownership of queue and outputs. On Close, the
// queue and outputs will be closed.
Expand Down

0 comments on commit 1f4ef9a

Please sign in to comment.