diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go new file mode 100644 index 00000000000..67bc9c7ac13 --- /dev/null +++ b/filebeat/input/v2/compat/compat.go @@ -0,0 +1,153 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package compat provides helpers for integrating the input/v2 API with +// existing input based features like autodiscovery, config file reloading, or +// filebeat modules. +package compat + +import ( + "fmt" + "sync" + + "github.com/mitchellh/hashstructure" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + "github.com/elastic/go-concert" +) + +// factory implements the cfgfile.RunnerFactory interface and wraps the +// v2.Loader to create cfgfile.Runner instances based on available v2 inputs. +type factory struct { + log *logp.Logger + info beat.Info + loader *v2.Loader +} + +// runner wraps a v2.Input, starting a go-routine +// On start the runner spawns a go-routine that will call (v2.Input).Run with +// the `sig` setup for shutdown signaling. +// On stop the runner triggers the shutdown signal and waits until the input +// has returned. +type runner struct { + id string + log *logp.Logger + agent *beat.Info + wg sync.WaitGroup + sig *concert.OnceSignaler + input v2.Input + connector beat.PipelineConnector +} + +// RunnerFactory creates a cfgfile.RunnerFactory from an input Loader that is +// compatible with config file based input reloading, autodiscovery, and filebeat modules. +// The RunnerFactory is can be used to integrate v2 inputs into existing Beats. +func RunnerFactory( + log *logp.Logger, + info beat.Info, + loader *v2.Loader, +) cfgfile.RunnerFactory { + return &factory{log: log, info: info, loader: loader} +} + +func (f *factory) CheckConfig(cfg *common.Config) error { + _, err := f.loader.Configure(cfg) + if err != nil { + return err + } + return nil +} + +func (f *factory) Create( + p beat.PipelineConnector, + config *common.Config, +) (cfgfile.Runner, error) { + input, err := f.loader.Configure(config) + if err != nil { + return nil, err + } + + id, err := configID(config) + if err != nil { + return nil, err + } + + return &runner{ + id: id, + log: f.log.Named(input.Name()), + agent: &f.info, + sig: concert.NewOnceSignaler(), + input: input, + connector: p, + }, nil +} + +func (r *runner) String() string { return r.input.Name() } + +func (r *runner) Start() { + log := r.log + name := r.input.Name() + + go func() { + log.Infof("Input %v starting", name) + err := r.input.Run( + v2.Context{ + ID: r.id, + Agent: *r.agent, + Logger: log, + Cancelation: r.sig, + }, + r.connector, + ) + if err != nil { + log.Errorf("Input '%v' failed with: %+v", name, err) + } else { + log.Infof("Input '%v' stopped", name) + } + }() +} + +func (r *runner) Stop() { + r.sig.Trigger() + r.wg.Wait() + r.log.Infof("Input '%v' stopped", r.input.Name()) +} + +func configID(config *common.Config) (string, error) { + tmp := struct { + ID string `config:"id"` + }{} + if err := config.Unpack(&tmp); err != nil { + return "", fmt.Errorf("error extracting ID: %w", err) + } + if tmp.ID != "" { + return tmp.ID, nil + } + + var h map[string]interface{} + config.Unpack(&h) + id, err := hashstructure.Hash(h, nil) + if err != nil { + return "", fmt.Errorf("can not compute id from configuration: %w", err) + } + + return fmt.Sprintf("%16X", id), nil +} diff --git a/filebeat/input/v2/compat/compat_test.go b/filebeat/input/v2/compat/compat_test.go new file mode 100644 index 00000000000..4973b3928c2 --- /dev/null +++ b/filebeat/input/v2/compat/compat_test.go @@ -0,0 +1,120 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package compat + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/filebeat/input/v2/internal/inputest" +) + +func TestRunnerFactory_CheckConfig(t *testing.T) { + t.Run("does not run or test configured input", func(t *testing.T) { + log := logp.NewLogger("test") + var countConfigure, countTest, countRun int + + // setup + plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{ + OnConfigure: func(_ *common.Config) (v2.Input, error) { + countConfigure++ + return &inputest.MockInput{ + OnTest: func(_ v2.TestContext) error { countTest++; return nil }, + OnRun: func(_ v2.Context, _ beat.PipelineConnector) error { countRun++; return nil }, + }, nil + }, + }) + loader := inputest.MustNewTestLoader(t, plugins, "type", "test") + factory := RunnerFactory(log, beat.Info{}, loader.Loader) + + // run + err := factory.CheckConfig(common.NewConfig()) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // validate: configured an input, but do not run test or run + assert.Equal(t, 1, countConfigure) + assert.Equal(t, 0, countTest) + assert.Equal(t, 0, countRun) + }) + + t.Run("fail if input type is unknown to loader", func(t *testing.T) { + log := logp.NewLogger("test") + plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil)) + loader := inputest.MustNewTestLoader(t, plugins, "type", "") + factory := RunnerFactory(log, beat.Info{}, loader.Loader) + + // run + err := factory.CheckConfig(common.MustNewConfigFrom(map[string]interface{}{ + "type": "unknown", + })) + assert.Error(t, err) + }) +} + +func TestRunnerFactory_CreateAndRun(t *testing.T) { + t.Run("runner can correctly start and stop inputs", func(t *testing.T) { + log := logp.NewLogger("test") + var countRun int + var wg sync.WaitGroup + plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(&inputest.MockInput{ + OnRun: func(ctx v2.Context, _ beat.PipelineConnector) error { + defer wg.Done() + countRun++ + <-ctx.Cancelation.Done() + return nil + }, + })) + loader := inputest.MustNewTestLoader(t, plugins, "type", "test") + factory := RunnerFactory(log, beat.Info{}, loader.Loader) + + runner, err := factory.Create(nil, common.MustNewConfigFrom(map[string]interface{}{ + "type": "test", + })) + require.NoError(t, err) + + wg.Add(1) + runner.Start() + runner.Stop() + wg.Wait() + assert.Equal(t, 1, countRun) + }) + + t.Run("fail if input type is unknown to loader", func(t *testing.T) { + log := logp.NewLogger("test") + plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil)) + loader := inputest.MustNewTestLoader(t, plugins, "type", "") + factory := RunnerFactory(log, beat.Info{}, loader.Loader) + + // run + runner, err := factory.Create(nil, common.MustNewConfigFrom(map[string]interface{}{ + "type": "unknown", + })) + assert.Nil(t, runner) + assert.Error(t, err) + }) +} diff --git a/filebeat/input/v2/compat/composed.go b/filebeat/input/v2/compat/composed.go new file mode 100644 index 00000000000..26c274e8891 --- /dev/null +++ b/filebeat/input/v2/compat/composed.go @@ -0,0 +1,77 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package compat + +import ( + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" +) + +// composeFactory combines to factories. Instances are created using the Combine function. +// For each operation the configured factory will be tried first. If the +// operation failed (for example the input type is unknown) the fallback factory is tried. +type composeFactory struct { + factory cfgfile.RunnerFactory + fallback cfgfile.RunnerFactory +} + +// Combine takes two RunnerFactory instances and creates a new RunnerFactory. +// The new factory will first try to create an input using factory. If this operation fails fallback will be used. +// +// The new RunnerFactory will return the error of fallback only if factory did +// signal that the input type is unknown via v2.ErrUnknown. +// +// XXX: This RunnerFactory is used for combining the v2.Loader with the +// existing RunnerFactory for inputs in Filebeat. The Combine function should be removed once the old RunnerFactory is removed. +func Combine(factory, fallback cfgfile.RunnerFactory) cfgfile.RunnerFactory { + return composeFactory{factory: factory, fallback: fallback} +} + +func (f composeFactory) CheckConfig(cfg *common.Config) error { + err := f.factory.CheckConfig(cfg) + if !v2.IsUnknownInputError(err) { + return err + } + return f.fallback.CheckConfig(cfg) +} + +func (f composeFactory) Create( + p beat.PipelineConnector, + config *common.Config, +) (cfgfile.Runner, error) { + var runner cfgfile.Runner + var err1, err2 error + + runner, err1 = f.factory.Create(p, config) + if err1 == nil { + return runner, nil + } + + runner, err2 = f.fallback.Create(p, config) + if err2 == nil { + return runner, nil + } + + // return err2 only if err1 indicates that the input type is not known to f.factory + if v2.IsUnknownInputError(err1) { + return nil, err2 + } + return nil, err1 +} diff --git a/filebeat/input/v2/compat/composed_test.go b/filebeat/input/v2/compat/composed_test.go new file mode 100644 index 00000000000..b2ae6c3cb46 --- /dev/null +++ b/filebeat/input/v2/compat/composed_test.go @@ -0,0 +1,202 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package compat + +import ( + "errors" + "testing" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" +) + +type fakeRunnerFactory struct { + OnCheck func(*common.Config) error + OnCreate func(beat.PipelineConnector, *common.Config) (cfgfile.Runner, error) +} + +type fakeRunner struct { + Name string + OnStart func() + OnStop func() +} + +func TestCombine_CheckConfig(t *testing.T) { + oops1 := errors.New("oops1") + oops2 := errors.New("oops2") + + cases := map[string]struct { + factory, fallback cfgfile.RunnerFactory + want error + }{ + "success": { + factory: failingRunnerFactory(nil), + fallback: failingRunnerFactory(nil), + want: nil, + }, + "fail if factory fails already": { + factory: failingRunnerFactory(oops1), + fallback: failingRunnerFactory(oops2), + want: oops1, + }, + "do not fail in fallback if factory is fine": { + factory: failingRunnerFactory(nil), + fallback: failingRunnerFactory(oops2), + want: nil, + }, + "ignore ErrUnknownInput and use check from fallback": { + factory: failingRunnerFactory(v2.ErrUnknownInput), + fallback: failingRunnerFactory(oops2), + want: oops2, + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + factory := Combine(test.factory, test.fallback) + cfg := common.MustNewConfigFrom(struct{ Type string }{"test"}) + err := factory.CheckConfig(cfg) + if test.want != err { + t.Fatalf("Failed. Want: %v, Got: %v", test.want, err) + } + }) + } + +} + +func TestCombine_Create(t *testing.T) { + type validation func(*testing.T, cfgfile.Runner, error) + + wantError := func(want error) validation { + return func(t *testing.T, _ cfgfile.Runner, got error) { + if want != got { + t.Fatalf("Wrong error. Want: %v, Got: %v", want, got) + } + } + } + + wantRunner := func(want cfgfile.Runner) validation { + return func(t *testing.T, got cfgfile.Runner, err error) { + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if got != want { + t.Fatalf("Wrong runner. Want: %v Got: %v", want, got) + } + } + } + + runner1 := &fakeRunner{Name: "runner1"} + runner2 := &fakeRunner{Name: "runner2"} + oops1 := errors.New("oops1") + oops2 := errors.New("oops2") + + cases := map[string]struct { + factory cfgfile.RunnerFactory + fallback cfgfile.RunnerFactory + Type string + check validation + }{ + "runner exsits in factory only": { + factory: constRunnerFactory(runner1), + fallback: failingRunnerFactory(oops2), + check: wantRunner(runner1), + }, + "runner exists in fallback only": { + factory: failingRunnerFactory(v2.ErrUnknownInput), + fallback: constRunnerFactory(runner2), + check: wantRunner(runner2), + }, + "runner from factory has higher priority": { + factory: constRunnerFactory(runner1), + fallback: constRunnerFactory(runner2), + check: wantRunner(runner1), + }, + "if both fail return error from factory": { + factory: failingRunnerFactory(oops1), + fallback: failingRunnerFactory(oops2), + check: wantError(oops1), + }, + "ignore ErrUnknown": { + factory: failingRunnerFactory(v2.ErrUnknownInput), + fallback: failingRunnerFactory(oops2), + check: wantError(oops2), + }, + } + + for name, test := range cases { + t.Run(name, func(t *testing.T) { + factory := Combine(test.factory, test.fallback) + cfg := common.MustNewConfigFrom(struct{ Type string }{test.Type}) + runner, err := factory.Create(nil, cfg) + test.check(t, runner, err) + }) + } +} + +// Create creates a new Runner based on the given configuration. +func (f *fakeRunnerFactory) Create(p beat.PipelineConnector, config *common.Config) (cfgfile.Runner, error) { + if f.OnCreate == nil { + return nil, errors.New("not implemented") + } + return f.OnCreate(p, config) +} + +// CheckConfig tests if a confiugation can be used to create an input. If it +// is not possible to create an input using the configuration, an error must +// be returned. +func (f *fakeRunnerFactory) CheckConfig(config *common.Config) error { + if f.OnCheck == nil { + return errors.New("not implemented") + } + return f.OnCheck(config) +} + +func (f *fakeRunner) String() string { return f.Name } +func (f *fakeRunner) Start() { + if f.OnStart != nil { + f.OnStart() + } +} + +func (f *fakeRunner) Stop() { + if f.OnStop != nil { + f.OnStop() + } +} + +func constRunnerFactory(runner cfgfile.Runner) cfgfile.RunnerFactory { + return &fakeRunnerFactory{ + OnCreate: func(_ beat.PipelineConnector, _ *common.Config) (cfgfile.Runner, error) { + return runner, nil + }, + } +} + +func failingRunnerFactory(err error) cfgfile.RunnerFactory { + return &fakeRunnerFactory{ + OnCheck: func(_ *common.Config) error { return err }, + + OnCreate: func(_ beat.PipelineConnector, _ *common.Config) (cfgfile.Runner, error) { + return nil, err + }, + } +}