From e76e9142823cb7f2ab9bc31de2dc55e9ea2bd7d0 Mon Sep 17 00:00:00 2001 From: Steffen Siering Date: Mon, 15 Jun 2020 17:21:09 +0200 Subject: [PATCH] Apply Autodiscovery dynamic fields in autoreloading (#19135) --- CHANGELOG-developer.next.asciidoc | 1 + filebeat/beater/crawler.go | 2 +- filebeat/channel/runner.go | 13 +-- filebeat/fileset/factory.go | 6 +- filebeat/fileset/setup.go | 4 +- filebeat/input/input.go | 10 +-- filebeat/input/kafka/input.go | 3 - filebeat/input/log/input.go | 6 +- filebeat/input/mqtt/client_mocked.go | 4 +- filebeat/input/mqtt/input.go | 6 +- filebeat/input/redis/input.go | 7 +- filebeat/input/registry.go | 9 +-- filebeat/input/runnerfactory.go | 5 +- filebeat/input/stdin/input.go | 7 +- filebeat/input/syslog/input.go | 6 +- filebeat/input/tcp/input.go | 6 +- filebeat/input/udp/input.go | 6 +- filebeat/input/unix/input.go | 6 +- heartbeat/beater/heartbeat.go | 2 +- heartbeat/monitors/factory.go | 4 +- heartbeat/monitors/monitor.go | 10 +-- heartbeat/monitors/monitor_test.go | 8 +- heartbeat/monitors/task.go | 4 - libbeat/autodiscover/autodiscover_test.go | 12 +-- libbeat/cfgfile/factories.go | 3 +- libbeat/cfgfile/list.go | 13 ++- libbeat/cfgfile/list_test.go | 83 +++++++++++++++++++- libbeat/cfgfile/reload.go | 2 +- libbeat/publisher/pipetool/pipetool.go | 21 ++++- libbeat/publisher/testing/connector.go | 59 ++++++++++++++ metricbeat/beater/metricbeat.go | 2 +- metricbeat/mb/module/connector.go | 26 +++--- metricbeat/mb/module/example_test.go | 2 +- metricbeat/mb/module/factory.go | 4 +- x-pack/filebeat/input/azureeventhub/input.go | 6 +- x-pack/filebeat/input/cloudfoundry/input.go | 6 +- x-pack/filebeat/input/googlepubsub/input.go | 3 - x-pack/filebeat/input/http_endpoint/input.go | 6 +- x-pack/filebeat/input/httpjson/input.go | 6 +- x-pack/filebeat/input/netflow/input.go | 6 +- x-pack/filebeat/input/o365audit/input.go | 3 - x-pack/filebeat/input/s3/input.go | 3 - 42 files changed, 232 insertions(+), 169 deletions(-) create mode 100644 libbeat/publisher/testing/connector.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index cf93fe5a690d..83329e006a2b 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -48,6 +48,7 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - The Elasticsearch client settings expect the API key to be raw (not base64-encoded). {issue}18939[18939] {pull}18945[18945] - `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114] - `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114] +- Remove `common.MapStrPointer` parameter from `cfgfile.Runnerfactory` interface. {pull}19135[19135] ==== Bugfixes diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index 9ac830f86961..2972a5f14307 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -128,7 +128,7 @@ func (c *crawler) startInput( return fmt.Errorf("input with same ID already exists: %v", id) } - runner, err := c.inputsFactory.Create(pipeline, config, nil) + runner, err := c.inputsFactory.Create(pipeline, config) if err != nil { return fmt.Errorf("Error while initializing input: %+v", err) } diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index 941e1cc81617..11e31ad88415 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -32,7 +32,7 @@ type onCreateFactory struct { create onCreateWrapper } -type onCreateWrapper func(cfgfile.RunnerFactory, beat.PipelineConnector, *common.Config, *common.MapStrPointer) (cfgfile.Runner, error) +type onCreateWrapper func(cfgfile.RunnerFactory, beat.PipelineConnector, *common.Config) (cfgfile.Runner, error) // commonInputConfig defines common input settings // for the publisher pipeline. @@ -63,12 +63,8 @@ func (f *onCreateFactory) CheckConfig(cfg *common.Config) error { return f.factory.CheckConfig(cfg) } -func (f *onCreateFactory) Create( - pipeline beat.PipelineConnector, - cfg *common.Config, - meta *common.MapStrPointer, -) (cfgfile.Runner, error) { - return f.create(f.factory, pipeline, cfg, meta) +func (f *onCreateFactory) Create(pipeline beat.PipelineConnector, cfg *common.Config) (cfgfile.Runner, error) { + return f.create(f.factory, pipeline, cfg) } // RunnerFactoryWithCommonInputSettings wraps a runner factory, such that all runners @@ -93,14 +89,13 @@ func RunnerFactoryWithCommonInputSettings(info beat.Info, f cfgfile.RunnerFactor f cfgfile.RunnerFactory, pipeline beat.PipelineConnector, cfg *common.Config, - meta *common.MapStrPointer, ) (runner cfgfile.Runner, err error) { pipeline, err = withClientConfig(info, pipeline, cfg) if err != nil { return nil, err } - return f.Create(pipeline, cfg, meta) + return f.Create(pipeline, cfg) }) } diff --git a/filebeat/fileset/factory.go b/filebeat/fileset/factory.go index 46e86d7a33f1..8a4d4fe09dc6 100644 --- a/filebeat/fileset/factory.go +++ b/filebeat/fileset/factory.go @@ -76,7 +76,7 @@ func NewFactory( } // Create creates a module based on a config -func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (f *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) { // Start a registry of one module: m, err := NewModuleRegistry([]*common.Config{c}, f.beatInfo, false) if err != nil { @@ -98,7 +98,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *commo inputs := make([]cfgfile.Runner, len(pConfigs)) for i, pConfig := range pConfigs { - inputs[i], err = f.inputFactory.Create(p, pConfig, meta) + inputs[i], err = f.inputFactory.Create(p, pConfig) if err != nil { logp.Err("Error creating input: %s", err) return nil, err @@ -116,7 +116,7 @@ func (f *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *commo } func (f *Factory) CheckConfig(c *common.Config) error { - _, err := f.Create(pubpipeline.NewNilPipeline(), c, nil) + _, err := f.Create(pubpipeline.NewNilPipeline(), c) return err } diff --git a/filebeat/fileset/setup.go b/filebeat/fileset/setup.go index 027e9506dc0c..b76cf2719662 100644 --- a/filebeat/fileset/setup.go +++ b/filebeat/fileset/setup.go @@ -42,7 +42,7 @@ func NewSetupFactory(beatInfo beat.Info, pipelineLoaderFactory PipelineLoaderFac } // Create creates a new SetupCfgRunner to setup module configuration. -func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *common.MapStrPointer) (cfgfile.Runner, error) { +func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) { m, err := NewModuleRegistry([]*common.Config{c}, sf.beatInfo, false) if err != nil { return nil, err @@ -56,7 +56,7 @@ func (sf *SetupFactory) Create(_ beat.PipelineConnector, c *common.Config, _ *co } func (sf *SetupFactory) CheckConfig(c *common.Config) error { - _, err := sf.Create(pubpipeline.NewNilPipeline(), c, nil) + _, err := sf.Create(pubpipeline.NewNilPipeline(), c) return err } diff --git a/filebeat/input/input.go b/filebeat/input/input.go index f488fb1ec873..7870d5979dff 100644 --- a/filebeat/input/input.go +++ b/filebeat/input/input.go @@ -60,7 +60,6 @@ func New( connector channel.Connector, beatDone chan struct{}, states []file.State, - dynFields *common.MapStrPointer, ) (*Runner, error) { input := &Runner{ config: defaultConfig, @@ -82,11 +81,10 @@ func New( } context := Context{ - States: states, - Done: input.done, - BeatDone: input.beatDone, - DynamicFields: dynFields, - Meta: nil, + States: states, + Done: input.done, + BeatDone: input.beatDone, + Meta: nil, } var ipt Input ipt, err = f(conf, connector, context) diff --git a/filebeat/input/kafka/input.go b/filebeat/input/kafka/input.go index 5dadb0585124..b29ae5354d50 100644 --- a/filebeat/input/kafka/input.go +++ b/filebeat/input/kafka/input.go @@ -69,9 +69,6 @@ func NewInput( } out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, ACKEvents: func(events []interface{}) { for _, event := range events { if meta, ok := event.(eventMeta); ok { diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 1c59e5b73d37..032f5c11c92f 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -107,11 +107,7 @@ func NewInput( // The outlet generated here is the underlying outlet, only closed // once all workers have been shut down. // For state updates and events, separate sub-outlets will be used. - out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := outlet.Connect(cfg) if err != nil { return nil, err } diff --git a/filebeat/input/mqtt/client_mocked.go b/filebeat/input/mqtt/client_mocked.go index 2ac2ccb77a03..472164d8e837 100644 --- a/filebeat/input/mqtt/client_mocked.go +++ b/filebeat/input/mqtt/client_mocked.go @@ -190,8 +190,8 @@ type mockedConnector struct { var _ channel.Connector = new(mockedConnector) -func (m *mockedConnector) Connect(*common.Config) (channel.Outleter, error) { - panic("implement me") +func (m *mockedConnector) Connect(c *common.Config) (channel.Outleter, error) { + return m.ConnectWith(c, beat.ClientConfig{}) } func (m *mockedConnector) ConnectWith(*common.Config, beat.ClientConfig) (channel.Outleter, error) { diff --git a/filebeat/input/mqtt/input.go b/filebeat/input/mqtt/input.go index a802a43c6281..9266429a200a 100644 --- a/filebeat/input/mqtt/input.go +++ b/filebeat/input/mqtt/input.go @@ -79,11 +79,7 @@ func newInput( return nil, errors.Wrap(err, "reading mqtt input config") } - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, - }) + out, err := connector.Connect(cfg) if err != nil { return nil, err } diff --git a/filebeat/input/redis/input.go b/filebeat/input/redis/input.go index a784af74c055..8b3b95d013ec 100644 --- a/filebeat/input/redis/input.go +++ b/filebeat/input/redis/input.go @@ -26,7 +26,6 @@ import ( "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/filebeat/input/file" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/logp" @@ -59,11 +58,7 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con return nil, err } - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := connector.Connect(cfg) if err != nil { return nil, err } diff --git a/filebeat/input/registry.go b/filebeat/input/registry.go index 06b23d3ac189..74f5dda8bbac 100644 --- a/filebeat/input/registry.go +++ b/filebeat/input/registry.go @@ -27,11 +27,10 @@ import ( ) type Context struct { - States []file.State - Done chan struct{} - BeatDone chan struct{} - DynamicFields *common.MapStrPointer - Meta map[string]string + States []file.State + Done chan struct{} + BeatDone chan struct{} + Meta map[string]string } // Factory is used to register functions creating new Input instances. diff --git a/filebeat/input/runnerfactory.go b/filebeat/input/runnerfactory.go index a47e848bc6d4..179057c43737 100644 --- a/filebeat/input/runnerfactory.go +++ b/filebeat/input/runnerfactory.go @@ -46,10 +46,9 @@ func NewRunnerFactory(outlet channel.Factory, registrar *registrar.Registrar, be func (r *RunnerFactory) Create( pipeline beat.PipelineConnector, c *common.Config, - meta *common.MapStrPointer, ) (cfgfile.Runner, error) { connector := r.outlet(pipeline) - p, err := New(c, connector, r.beatDone, r.registrar.GetStates(), meta) + p, err := New(c, connector, r.beatDone, r.registrar.GetStates()) if err != nil { // In case of error with loading state, input is still returned return p, err @@ -59,6 +58,6 @@ func (r *RunnerFactory) Create( } func (r *RunnerFactory) CheckConfig(cfg *common.Config) error { - _, err := r.Create(pipeline.NewNilPipeline(), cfg, nil) + _, err := r.Create(pipeline.NewNilPipeline(), cfg) return err } diff --git a/filebeat/input/stdin/input.go b/filebeat/input/stdin/input.go index 8273da27f622..0e8fbd0fc104 100644 --- a/filebeat/input/stdin/input.go +++ b/filebeat/input/stdin/input.go @@ -25,7 +25,6 @@ import ( "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/filebeat/input/log" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/logp" ) @@ -49,11 +48,7 @@ type Input struct { // NewInput creates a new stdin input // This input contains one harvester which is reading from stdin func NewInput(cfg *common.Config, outlet channel.Connector, context input.Context) (input.Input, error) { - out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := outlet.Connect(cfg) if err != nil { return nil, err } diff --git a/filebeat/input/syslog/input.go b/filebeat/input/syslog/input.go index ecbc8db9cd80..7e93ca162e14 100644 --- a/filebeat/input/syslog/input.go +++ b/filebeat/input/syslog/input.go @@ -112,11 +112,7 @@ func NewInput( log := logp.NewLogger("syslog") - out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := outlet.Connect(cfg) if err != nil { return nil, err } diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 5b3b8dc83949..96c6ce990222 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -57,11 +57,7 @@ func NewInput( context input.Context, ) (input.Input, error) { - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := connector.Connect(cfg) if err != nil { return nil, err } diff --git a/filebeat/input/udp/input.go b/filebeat/input/udp/input.go index 9d5f45208908..fe211537b0b4 100644 --- a/filebeat/input/udp/input.go +++ b/filebeat/input/udp/input.go @@ -53,11 +53,7 @@ func NewInput( context input.Context, ) (input.Input, error) { - out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := outlet.Connect(cfg) if err != nil { return nil, err } diff --git a/filebeat/input/unix/input.go b/filebeat/input/unix/input.go index 19609cb5ab8a..3c827c7cd92c 100644 --- a/filebeat/input/unix/input.go +++ b/filebeat/input/unix/input.go @@ -59,11 +59,7 @@ func NewInput( ) (input.Input, error) { cfgwarn.Beta("Unix socket support is beta.") - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := connector.Connect(cfg) if err != nil { return nil, err } diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 817fea40b6d6..7e0f2aa75a1e 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -126,7 +126,7 @@ func (bt *Heartbeat) RunStaticMonitors(b *beat.Beat) error { factory := monitors.NewFactory(bt.scheduler, true) for _, cfg := range bt.config.Monitors { - created, err := factory.Create(b.Publisher, cfg, nil) + created, err := factory.Create(b.Publisher, cfg) if err != nil { return errors.Wrap(err, "could not create monitor") } diff --git a/heartbeat/monitors/factory.go b/heartbeat/monitors/factory.go index 26937020dd93..e453bc3a03ac 100644 --- a/heartbeat/monitors/factory.go +++ b/heartbeat/monitors/factory.go @@ -37,8 +37,8 @@ func NewFactory(sched *scheduler.Scheduler, allowWatches bool) *RunnerFactory { } // Create makes a new Runner for a new monitor with the given Config. -func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { - monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches, meta) +func (f *RunnerFactory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) { + monitor, err := newMonitor(c, globalPluginsReg, p, f.sched, f.allowWatches) return monitor, err } diff --git a/heartbeat/monitors/monitor.go b/heartbeat/monitors/monitor.go index 0326c4094156..e0fcb886357d 100644 --- a/heartbeat/monitors/monitor.go +++ b/heartbeat/monitors/monitor.go @@ -62,8 +62,7 @@ type Monitor struct { // stats is the countersRecorder used to record lifecycle events // for global metrics + telemetry - stats registryRecorder - factoryMetadata *common.MapStrPointer + stats registryRecorder } // String prints a description of the monitor in a threadsafe way. It is important that this use threadsafe @@ -73,7 +72,7 @@ func (m *Monitor) String() string { } func checkMonitorConfig(config *common.Config, registrar *pluginsReg, allowWatches bool) error { - m, err := newMonitor(config, registrar, nil, nil, allowWatches, nil) + m, err := newMonitor(config, registrar, nil, nil, allowWatches) if m != nil { m.Stop() // Stop the monitor to free up the ID from uniqueness checks } @@ -101,9 +100,8 @@ func newMonitor( pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, allowWatches bool, - factoryMetadata *common.MapStrPointer, ) (*Monitor, error) { - m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches, factoryMetadata) + m, err := newMonitorUnsafe(config, registrar, pipelineConnector, scheduler, allowWatches) if m != nil && err != nil { m.Stop() } @@ -118,7 +116,6 @@ func newMonitorUnsafe( pipelineConnector beat.PipelineConnector, scheduler *scheduler.Scheduler, allowWatches bool, - factoryMetadata *common.MapStrPointer, ) (*Monitor, error) { // Extract just the Id, Type, and Enabled fields from the config // We'll parse things more precisely later once we know what exact type of @@ -145,7 +142,6 @@ func newMonitorUnsafe( internalsMtx: sync.Mutex{}, config: config, stats: monitorPlugin.stats, - factoryMetadata: factoryMetadata, } if m.id != "" { diff --git a/heartbeat/monitors/monitor_test.go b/heartbeat/monitors/monitor_test.go index 50553a06e89a..341839e382dd 100644 --- a/heartbeat/monitors/monitor_test.go +++ b/heartbeat/monitors/monitor_test.go @@ -41,7 +41,7 @@ func TestMonitor(t *testing.T) { require.NoError(t, err) defer sched.Stop() - mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false, nil) + mon, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false) require.NoError(t, err) mon.Start() @@ -90,11 +90,11 @@ func TestDuplicateMonitorIDs(t *testing.T) { defer sched.Stop() makeTestMon := func() (*Monitor, error) { - return newMonitor(serverMonConf, reg, pipelineConnector, sched, false, nil) + return newMonitor(serverMonConf, reg, pipelineConnector, sched, false) } // Ensure that an error is returned on a bad config - _, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, false, nil) + _, m0Err := newMonitor(badConf, reg, pipelineConnector, sched, false) require.Error(t, m0Err) // Would fail if the previous newMonitor didn't free the monitor.id @@ -118,7 +118,7 @@ func TestCheckInvalidConfig(t *testing.T) { require.NoError(t, err) defer sched.Stop() - m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false, nil) + m, err := newMonitor(serverMonConf, reg, pipelineConnector, sched, false) // This could change if we decide the contract for newMonitor should always return a monitor require.Nil(t, m, "For this test to work we need a nil value for the monitor.") diff --git a/heartbeat/monitors/task.go b/heartbeat/monitors/task.go index 4c045bf6513c..92d28d225d94 100644 --- a/heartbeat/monitors/task.go +++ b/heartbeat/monitors/task.go @@ -102,10 +102,6 @@ func (t *configuredJob) Start() { var err error fields := common.MapStr{"event": common.MapStr{"dataset": "uptime"}} - if t.monitor.factoryMetadata != nil { - fields.DeepUpdate(t.monitor.factoryMetadata.Get()) - } - t.client, err = t.monitor.pipelineConnector.ConnectWith(beat.ClientConfig{ Processing: beat.ProcessingConfig{ EventMetadata: t.config.EventMetadata, diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index 23a3fe14da32..deec66ece8e7 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -37,7 +37,6 @@ import ( type mockRunner struct { mutex sync.Mutex config *common.Config - meta *common.MapStrPointer started, stopped bool } @@ -56,7 +55,6 @@ func (m *mockRunner) Clone() *mockRunner { defer m.mutex.Unlock() return &mockRunner{ config: m.config, - meta: m.meta, started: m.started, stopped: m.stopped, } @@ -93,10 +91,9 @@ func (m *mockAdapter) CheckConfig(c *common.Config) error { return nil } -func (m *mockAdapter) Create(_ beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (m *mockAdapter) Create(_ beat.PipelineConnector, config *common.Config) (cfgfile.Runner, error) { runner := &mockRunner{ config: config, - meta: meta, } m.mutex.Lock() defer m.mutex.Unlock() @@ -191,7 +188,6 @@ func TestAutodiscover(t *testing.T) { runners := adapter.Runners() assert.Equal(t, len(runners), 1) assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1) - assert.Equal(t, runners[0].meta.Get()["foo"], "bar") assert.True(t, runners[0].started) assert.False(t, runners[0].stopped) @@ -204,12 +200,10 @@ func TestAutodiscover(t *testing.T) { "foo": "baz", }, }) - wait(t, func() bool { return adapter.Runners()[0].meta.Get()["foo"] == "baz" }) runners = adapter.Runners() assert.Equal(t, len(runners), 1) assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1) - assert.Equal(t, runners[0].meta.Get()["foo"], "baz") // meta is updated assert.True(t, runners[0].started) assert.False(t, runners[0].stopped) @@ -236,7 +230,6 @@ func TestAutodiscover(t *testing.T) { assert.Equal(t, len(runners), 2) assert.Equal(t, len(autodiscover.configs["mock:foo"]), 1) assert.True(t, runners[0].stopped) - assert.Equal(t, runners[1].meta.Get()["foo"], "baz") assert.True(t, runners[1].started) assert.False(t, runners[1].stopped) @@ -254,7 +247,6 @@ func TestAutodiscover(t *testing.T) { runners = adapter.Runners() assert.Equal(t, len(runners), 2) assert.Equal(t, len(autodiscover.configs["mock:foo"]), 0) - assert.Equal(t, runners[1].meta.Get()["foo"], "baz") assert.True(t, runners[1].started) assert.True(t, runners[1].stopped) } @@ -318,10 +310,8 @@ func TestAutodiscoverHash(t *testing.T) { runners := adapter.Runners() assert.Equal(t, len(runners), 2) assert.Equal(t, len(autodiscover.configs["mock:foo"]), 2) - assert.Equal(t, runners[0].meta.Get()["foo"], "bar") assert.True(t, runners[0].started) assert.False(t, runners[0].stopped) - assert.Equal(t, runners[1].meta.Get()["foo"], "bar") assert.True(t, runners[1].started) assert.False(t, runners[1].stopped) } diff --git a/libbeat/cfgfile/factories.go b/libbeat/cfgfile/factories.go index 095a70772050..0df2b18c38d6 100644 --- a/libbeat/cfgfile/factories.go +++ b/libbeat/cfgfile/factories.go @@ -67,13 +67,12 @@ func MatchDefault(factory RunnerFactory) FactoryMatcher { func (f multiplexedFactory) Create( p beat.PipelineConnector, config *common.Config, - meta *common.MapStrPointer, ) (Runner, error) { factory, err := f.findFactory(config) if err != nil { return nil, err } - return factory.Create(p, config, meta) + return factory.Create(p, config) } func (f multiplexedFactory) CheckConfig(c *common.Config) error { diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 975dcee1c288..c38689d65b48 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -28,6 +28,7 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/beats/v7/libbeat/publisher/pipetool" ) // RunnerList implements a reloadable.List of Runners @@ -88,10 +89,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { // Start new runners for hash, config := range startList { - // Pass a copy of the config to the factory, this way if the factory modifies it, - // that doesn't affect the hash of the original one. - c, _ := common.NewConfigFrom(config.Config) - runner, err := r.factory.Create(r.pipeline, c, config.Meta) + runner, err := createRunner(r.factory, r.pipeline, config) if err != nil { r.logger.Errorf("Error creating runner from config: %s", err) errs = append(errs, errors.Wrap(err, "Error creating runner from config")) @@ -157,3 +155,10 @@ func (r *RunnerList) copyRunnerList() map[uint64]Runner { } return list } + +func createRunner(factory RunnerFactory, pipeline beat.PipelineConnector, config *reload.ConfigWithMeta) (Runner, error) { + // Pass a copy of the config to the factory, this way if the factory modifies it, + // that doesn't affect the hash of the original one. + c, _ := common.NewConfigFrom(config.Config) + return factory.Create(pipetool.WithDynamicFields(pipeline, config.Meta), c) +} diff --git a/libbeat/cfgfile/list_test.go b/libbeat/cfgfile/list_test.go index 3977efabaf08..01007b64cc13 100644 --- a/libbeat/cfgfile/list_test.go +++ b/libbeat/cfgfile/list_test.go @@ -26,24 +26,41 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" + pubtest "github.com/elastic/beats/v7/libbeat/publisher/testing" ) type runner struct { id int64 started bool stopped bool + OnStart func() + OnStop func() } func (r *runner) String() string { return "test runner" } -func (r *runner) Start() { r.started = true } -func (r *runner) Stop() { r.stopped = true } +func (r *runner) Start() { + r.started = true + if r.OnStart != nil { + r.OnStart() + } +} + +func (r *runner) Stop() { + if r.OnStop != nil { + r.OnStop() + } + r.stopped = true +} -type runnerFactory struct{ runners []*runner } +type runnerFactory struct { + CreateRunner func(beat.PipelineConnector, *common.Config) (Runner, error) + runners []Runner +} -func (r *runnerFactory) Create(x beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (Runner, error) { +func (r *runnerFactory) Create(x beat.PipelineConnector, c *common.Config) (Runner, error) { config := struct { ID int64 `config:"id"` }{} @@ -58,6 +75,15 @@ func (r *runnerFactory) Create(x beat.PipelineConnector, c *common.Config, meta return nil, errors.New("Invalid config") } + if r.CreateRunner != nil { + runner, err := r.CreateRunner(x, c) + if err != nil { + return nil, err + } + r.runners = append(r.runners, runner) + return runner, err + } + runner := &runner{id: config.ID} r.runners = append(r.runners, runner) return runner, err @@ -205,6 +231,55 @@ func TestHas(t *testing.T) { assert.False(t, list.Has(0)) } +func TestCreateRunnerAddsDynamicMeta(t *testing.T) { + newMapStrPointer := func(m common.MapStr) *common.MapStrPointer { + p := common.NewMapStrPointer(m) + return &p + } + + cases := map[string]struct { + meta *common.MapStrPointer + }{ + "no dynamic metadata": {}, + "with dynamic fields": { + meta: newMapStrPointer(common.MapStr{"test": 1}), + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + + factory := &runnerFactory{ + CreateRunner: func(p beat.PipelineConnector, cfg *common.Config) (Runner, error) { + return &runner{ + OnStart: func() { + c, _ := p.Connect() + c.Close() + }, + }, nil + }, + } + + var config beat.ClientConfig + pipeline := &pubtest.FakeConnector{ + ConnectFunc: func(cfg beat.ClientConfig) (beat.Client, error) { + config = cfg + return &pubtest.FakeClient{}, nil + }, + } + + runner, _ := createRunner(factory, pipeline, &reload.ConfigWithMeta{ + Config: common.NewConfig(), + Meta: test.meta, + }) + runner.Start() + runner.Stop() + + assert.Equal(t, test.meta, config.Processing.DynamicFields) + }) + } +} + func createConfig(id int64) *reload.ConfigWithMeta { c := common.NewConfig() c.SetInt("id", -1, id) diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index d5b9d9c315fe..a4ae1e6733b7 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -73,7 +73,7 @@ type Reload struct { // of new Runners type RunnerFactory interface { // Create creates a new Runner based on the given configuration. - Create(p beat.PipelineConnector, config *common.Config, meta *common.MapStrPointer) (Runner, error) + Create(p beat.PipelineConnector, config *common.Config) (Runner, error) // CheckConfig tests if a confiugation can be used to create an input. If it // is not possible to create an input using the configuration, an error must diff --git a/libbeat/publisher/pipetool/pipetool.go b/libbeat/publisher/pipetool/pipetool.go index a05ec6e6022e..af734f1d97a5 100644 --- a/libbeat/publisher/pipetool/pipetool.go +++ b/libbeat/publisher/pipetool/pipetool.go @@ -17,7 +17,10 @@ package pipetool -import "github.com/elastic/beats/v7/libbeat/beat" +import ( + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) // connectEditPipeline modifies the client configuration using edit before calling // edit. @@ -89,3 +92,19 @@ func WithDefaultGuarantees(pipeline beat.PipelineConnector, mode beat.PublishMod func WithClientWrapper(pipeline beat.PipelineConnector, wrap ClientWrapper) beat.PipelineConnector { return &wrapClientPipeline{parent: pipeline, wrapper: wrap} } + +// WithDynamicFields ensures that dynamicFields from autodiscovery are setup +// when connecting to the publisher pipeline. +// Processing.DynamicFields will only be overwritten if not is not already set. +func WithDynamicFields(pipeline beat.PipelineConnector, dynamicFields *common.MapStrPointer) beat.PipelineConnector { + if dynamicFields == nil { + return pipeline + } + + return WithClientConfigEdit(pipeline, func(cfg beat.ClientConfig) (beat.ClientConfig, error) { + if cfg.Processing.DynamicFields == nil { + cfg.Processing.DynamicFields = dynamicFields + } + return cfg, nil + }) +} diff --git a/libbeat/publisher/testing/connector.go b/libbeat/publisher/testing/connector.go new file mode 100644 index 000000000000..2b791ffd921d --- /dev/null +++ b/libbeat/publisher/testing/connector.go @@ -0,0 +1,59 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package testing + +import "github.com/elastic/beats/v7/libbeat/beat" + +type FakeConnector struct { + ConnectFunc func(beat.ClientConfig) (beat.Client, error) +} + +type FakeClient struct { + PublishFunc func(beat.Event) + CloseFunc func() error +} + +var _ beat.PipelineConnector = FakeConnector{} +var _ beat.Client = (*FakeClient)(nil) + +func (c FakeConnector) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + return c.ConnectFunc(cfg) +} + +func (c FakeConnector) Connect() (beat.Client, error) { + return c.ConnectWith(beat.ClientConfig{}) +} + +func (c *FakeClient) Publish(event beat.Event) { + if c.PublishFunc != nil { + c.PublishFunc(event) + } +} + +func (c *FakeClient) Close() error { + if c.CloseFunc == nil { + return nil + } + return c.CloseFunc() +} + +func (c *FakeClient) PublishAll(events []beat.Event) { + for _, event := range events { + c.Publish(event) + } +} diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index bdd45ac36930..75e6f16652eb 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -166,7 +166,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe continue } - runner, err := factory.Create(b.Publisher, moduleCfg, nil) + runner, err := factory.Create(b.Publisher, moduleCfg) if err != nil { return nil, err } diff --git a/metricbeat/mb/module/connector.go b/metricbeat/mb/module/connector.go index ea2292bd74cc..bff016fff4c1 100644 --- a/metricbeat/mb/module/connector.go +++ b/metricbeat/mb/module/connector.go @@ -30,12 +30,11 @@ import ( // Connector configures and establishes a beat.Client for publishing events // to the publisher pipeline. type Connector struct { - pipeline beat.PipelineConnector - processors *processors.Processors - eventMeta common.EventMetadata - dynamicFields *common.MapStrPointer - timeSeries bool - keepNull bool + pipeline beat.PipelineConnector + processors *processors.Processors + eventMeta common.EventMetadata + timeSeries bool + keepNull bool } type connectorConfig struct { @@ -54,8 +53,9 @@ type metricSetRegister interface { } func NewConnector( - beatInfo beat.Info, pipeline beat.PipelineConnector, - c *common.Config, dynFields *common.MapStrPointer, + beatInfo beat.Info, + pipeline beat.PipelineConnector, + c *common.Config, ) (*Connector, error) { config := connectorConfig{} if err := c.Unpack(&config); err != nil { @@ -68,11 +68,10 @@ func NewConnector( } return &Connector{ - pipeline: pipeline, - processors: processors, - eventMeta: config.EventMetadata, - dynamicFields: dynFields, - keepNull: config.KeepNull, + pipeline: pipeline, + processors: processors, + eventMeta: config.EventMetadata, + keepNull: config.KeepNull, }, nil } @@ -102,7 +101,6 @@ func (c *Connector) Connect() (beat.Client, error) { Processing: beat.ProcessingConfig{ EventMetadata: c.eventMeta, Processor: c.processors, - DynamicFields: c.dynamicFields, KeepNull: c.keepNull, }, }) diff --git a/metricbeat/mb/module/example_test.go b/metricbeat/mb/module/example_test.go index eee9dda60b8b..ba94f2162d2b 100644 --- a/metricbeat/mb/module/example_test.go +++ b/metricbeat/mb/module/example_test.go @@ -132,7 +132,7 @@ func ExampleRunner() { return } - connector, err := module.NewConnector(b.Info, b.Publisher, config, nil) + connector, err := module.NewConnector(b.Info, b.Publisher, config) if err != nil { return } diff --git a/metricbeat/mb/module/factory.go b/metricbeat/mb/module/factory.go index 9256fc5b5b11..d5b342afe454 100644 --- a/metricbeat/mb/module/factory.go +++ b/metricbeat/mb/module/factory.go @@ -40,7 +40,7 @@ func NewFactory(beatInfo beat.Info, options ...Option) *Factory { } // Create creates a new metricbeat module runner reporting events to the passed pipeline. -func (r *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *common.MapStrPointer) (cfgfile.Runner, error) { +func (r *Factory) Create(p beat.PipelineConnector, c *common.Config) (cfgfile.Runner, error) { module, metricSets, err := mb.NewModule(c, mb.Registry) if err != nil { return nil, err @@ -53,7 +53,7 @@ func (r *Factory) Create(p beat.PipelineConnector, c *common.Config, meta *commo return nil, err } - connector, err := NewConnector(r.beatInfo, p, c, meta) + connector, err := NewConnector(r.beatInfo, p, c) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/azureeventhub/input.go b/x-pack/filebeat/input/azureeventhub/input.go index de9c7cf9d79a..010e2807b560 100644 --- a/x-pack/filebeat/input/azureeventhub/input.go +++ b/x-pack/filebeat/input/azureeventhub/input.go @@ -85,11 +85,7 @@ func NewInput( workerCtx: workerCtx, workerCancel: workerCancel, } - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, - }) + out, err := connector.Connect(cfg) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/cloudfoundry/input.go b/x-pack/filebeat/input/cloudfoundry/input.go index ea2152129a2b..65755d1f2c1f 100644 --- a/x-pack/filebeat/input/cloudfoundry/input.go +++ b/x-pack/filebeat/input/cloudfoundry/input.go @@ -42,11 +42,7 @@ func NewInput( ) (input.Input, error) { log := logp.NewLogger("cloudfoundry") - out, err := outlet.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := outlet.Connect(cfg) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/googlepubsub/input.go b/x-pack/filebeat/input/googlepubsub/input.go index e9f48073d744..1f445304f38d 100644 --- a/x-pack/filebeat/input/googlepubsub/input.go +++ b/x-pack/filebeat/input/googlepubsub/input.go @@ -92,9 +92,6 @@ func NewInput( // Build outlet for events. in.outlet, err = connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, ACKEvents: func(privates []interface{}) { for _, priv := range privates { if msg, ok := priv.(*pubsub.Message); ok { diff --git a/x-pack/filebeat/input/http_endpoint/input.go b/x-pack/filebeat/input/http_endpoint/input.go index 5c0785f03132..555880fab6d3 100644 --- a/x-pack/filebeat/input/http_endpoint/input.go +++ b/x-pack/filebeat/input/http_endpoint/input.go @@ -62,11 +62,7 @@ func NewInput( } // Build outlet for events. - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, - }) + out, err := connector.Connect(cfg) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 4cbb90824590..aa23a5f8cf65 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -71,11 +71,7 @@ func NewInput( return nil, err } // Build outlet for events. - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, - }) + out, err := connector.Connect(cfg) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/netflow/input.go b/x-pack/filebeat/input/netflow/input.go index cc8d30815b02..0fbb7dfe2aeb 100644 --- a/x-pack/filebeat/input/netflow/input.go +++ b/x-pack/filebeat/input/netflow/input.go @@ -92,11 +92,7 @@ func NewInput( initLogger.Do(func() { logger = logp.NewLogger(inputName) }) - out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, - }) + out, err := connector.Connect(cfg) if err != nil { return nil, err } diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index cafba2184f3a..b60d5d00455d 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -83,9 +83,6 @@ func newInput( var out channel.Outleter out, err = connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: inputContext.DynamicFields, - }, ACKLastEvent: func(private interface{}) { // Errors don't have a cursor. if cursor, ok := private.(cursor); ok { diff --git a/x-pack/filebeat/input/s3/input.go b/x-pack/filebeat/input/s3/input.go index e3a0972f5b66..b8ebb4d755ce 100644 --- a/x-pack/filebeat/input/s3/input.go +++ b/x-pack/filebeat/input/s3/input.go @@ -137,9 +137,6 @@ func NewInput(cfg *common.Config, connector channel.Connector, context input.Con } out, err := connector.ConnectWith(cfg, beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - DynamicFields: context.DynamicFields, - }, ACKEvents: func(privates []interface{}) { for _, private := range privates { if s3Context, ok := private.(*s3Context); ok {