Skip to content

Commit

Permalink
Filebeat inputs: remove the unused run mode in v2.InputManager.Init (#…
Browse files Browse the repository at this point in the history
…39381)

The "v2" input migration a few years ago included a "mode" parameter that has never been used nontrivially, and today serves only to confuse input developers. This PR removes it.

Every removed instance was one of:
- the constant `v2.ModeRun`
- opaque handoff of a received mode to a function parameter
- an unused parameter
- a parameter required to be equal to `v2.ModeRun`

The other two modes, `v2.ModeTest` and `v2.ModeOther`, were never referenced.
  • Loading branch information
faec authored May 6, 2024
1 parent ff424ea commit dfea840
Show file tree
Hide file tree
Showing 26 changed files with 41 additions and 103 deletions.
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer func() {
_ = inputTaskGroup.Stop()
}()
if err := v2InputLoader.Init(&inputTaskGroup, v2.ModeRun); err != nil {
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{})
e.t.Helper()
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp, v2.ModeRun)
manager.Init(&e.grp)
c := conf.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand All @@ -107,7 +107,7 @@ func (e *inputTestingEnvironment) mustCreateInput(config map[string]interface{})
func (e *inputTestingEnvironment) createInput(config map[string]interface{}) (v2.Input, error) {
e.grp = unison.TaskGroup{}
manager := e.getManager()
manager.Init(&e.grp, v2.ModeRun)
manager.Init(&e.grp)
c := conf.MustNewConfigFrom(config)
inp, err := manager.Create(c)
if err != nil {
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ func (cim *InputManager) init() error {

// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error {
if mode != v2.ModeRun {
return nil
}

func (cim *InputManager) Init(group unison.Group) error {
if err := cim.init(); err != nil {
return err
}
Expand Down
6 changes: 1 addition & 5 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,7 @@ func (cim *InputManager) init() error {

// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group, mode v2.Mode) error {
if mode != v2.ModeRun {
return nil
}

func (cim *InputManager) Init(group unison.Group) error {
if err := cim.init(); err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/v2/input-cursor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestManager_Init(t *testing.T) {
DefaultCleanTimeout: 10 * time.Millisecond,
}

err := manager.Init(&grp, input.ModeRun)
err := manager.Init(&grp)
require.NoError(t, err)

time.Sleep(200 * time.Millisecond)
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestManager_Init(t *testing.T) {
DefaultCleanTimeout: 10 * time.Millisecond,
}

err := manager.Init(&grp, input.ModeRun)
err := manager.Init(&grp)
require.NoError(t, err)

for len(store.snapshot()) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/v2/input-stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewInputManager(configure func(*conf.C) (Input, error)) InputManager {
}

// Init does nothing. Init is required to fullfil the v2.InputManager interface.
func (m InputManager) Init(_ unison.Group, _ v2.Mode) error { return nil }
func (m InputManager) Init(_ unison.Group) error { return nil }

// Create configures a transient input and ensures that the final input can be used with
// with the filebeat input architecture.
Expand Down
12 changes: 1 addition & 11 deletions filebeat/input/v2/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type InputManager interface {
// Init signals to InputManager to initialize internal resources.
// The mode tells the input manager if the Beat is actually running the inputs or
// if inputs are only configured for testing/validation purposes.
Init(grp unison.Group, mode Mode) error
Init(grp unison.Group) error

// Create builds a new Input instance from the given configuation, or returns
// an error if the configuation is invalid.
Expand All @@ -48,16 +48,6 @@ type InputManager interface {
Create(*conf.C) (Input, error)
}

// Mode tells the InputManager in which mode it is initialized.
type Mode uint8

//go:generate stringer -type Mode -trimprefix Mode
const (
ModeRun Mode = iota
ModeTest
ModeOther
)

// Input is a configured input object that can be used to test or start
// the actual data collection.
type Input interface {
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/v2/internal/inputest/inputest.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// MockInputManager can be used as InputManager replacement in tests that require a new Input Manager.
// The OnInit and OnConfigure functions are executed if the corresponding methods get called.
type MockInputManager struct {
OnInit func(v2.Mode) error
OnInit func() error
OnConfigure InputConfigurer
}

Expand All @@ -47,9 +47,9 @@ type MockInput struct {
}

// Init returns nil if OnInit is not set. Otherwise the return value of OnInit is returned.
func (m *MockInputManager) Init(_ unison.Group, mode v2.Mode) error {
func (m *MockInputManager) Init(_ unison.Group) error {
if m.OnInit != nil {
return m.OnInit(mode)
return m.OnInit()
}
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/v2/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func NewLoader(log *logp.Logger, plugins []Plugin, typeField, defaultType string
}

// Init runs Init on all InputManagers for all plugins known to the loader.
func (l *Loader) Init(group unison.Group, mode Mode) error {
func (l *Loader) Init(group unison.Group) error {
for _, p := range l.registry {
if err := p.Manager.Init(group, mode); err != nil {
if err := p.Manager.Init(group); err != nil {
return err
}
}
Expand Down
10 changes: 5 additions & 5 deletions filebeat/input/v2/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestLoader_New(t *testing.T) {
}

func TestLoader_Init(t *testing.T) {
pluginWithInit := func(name string, fn func(Mode) error) Plugin {
pluginWithInit := func(name string, fn func() error) Plugin {
return Plugin{
Name: name,
Stability: feature.Stable,
Expand All @@ -85,7 +85,7 @@ func TestLoader_Init(t *testing.T) {

t.Run("calls all input managers", func(t *testing.T) {
count := 0
incCountOnInit := func(_ Mode) error { count++; return nil }
incCountOnInit := func() error { count++; return nil }

setup := loaderConfig{
Plugins: []Plugin{
Expand All @@ -94,7 +94,7 @@ func TestLoader_Init(t *testing.T) {
},
}
loader := setup.MustNewLoader()
err := loader.Init(nil, ModeRun)
err := loader.Init(nil)
expectNoError(t, err)
if count != 2 {
t.Errorf("expected init count 2, but got %v", count)
Expand All @@ -103,15 +103,15 @@ func TestLoader_Init(t *testing.T) {

t.Run("stop init on error", func(t *testing.T) {
count := 0
incCountOnInit := func(_ Mode) error { count++; return errors.New("oops") }
incCountOnInit := func() error { count++; return errors.New("oops") }
setup := loaderConfig{
Plugins: []Plugin{
pluginWithInit("a", incCountOnInit),
pluginWithInit("b", incCountOnInit),
},
}
loader := setup.MustNewLoader()
err := loader.Init(nil, ModeRun)
err := loader.Init(nil)
expectError(t, err)
if count != 1 {
t.Errorf("expected init count 1, but got %v", count)
Expand Down
42 changes: 0 additions & 42 deletions filebeat/input/v2/mode_string.go

This file was deleted.

2 changes: 1 addition & 1 deletion filebeat/input/v2/simplemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func ConfigureWith(fn func(*conf.C) (Input, error)) InputManager {

// Init is required to fulfil the input.InputManager interface.
// For the kafka input no special initialization is required.
func (*simpleInputManager) Init(grp unison.Group, m Mode) error { return nil }
func (*simpleInputManager) Init(grp unison.Group) error { return nil }

// Create builds a new Input instance from the given configuration, or returns
// an error if the configuration is invalid.
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/v2/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type fakeInputManager struct {
OnInit func(Mode) error
OnInit func() error
OnConfigure func(*conf.C) (Input, error)
}

Expand All @@ -44,9 +44,9 @@ func makeConfigFakeInput(prototype fakeInput) func(*conf.C) (Input, error) {
}
}

func (m *fakeInputManager) Init(_ unison.Group, mode Mode) error {
func (m *fakeInputManager) Init(_ unison.Group) error {
if m.OnInit != nil {
return m.OnInit(mode)
return m.OnInit()
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Plugin() v2.Plugin {
type cloudwatchInputManager struct {
}

func (im *cloudwatchInputManager) Init(grp unison.Group, mode v2.Mode) error {
func (im *cloudwatchInputManager) Init(grp unison.Group) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type s3InputManager struct {
store beater.StateStore
}

func (im *s3InputManager) Init(grp unison.Group, mode v2.Mode) error {
func (im *s3InputManager) Init(grp unison.Group) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/cel/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type source struct{ cfg config }
func (s *source) Name() string { return s.cfg.Resource.URL.String() }

// Init initializes both wrapped input managers.
func (m InputManager) Init(grp unison.Group, mode v2.Mode) error {
return m.cursor.Init(grp, mode)
func (m InputManager) Init(grp unison.Group) error {
return m.cursor.Init(grp)
}

// Create creates a cursor input manager.
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/entityanalytics/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type manager struct {

// Init is not used for this input. It is called before Create and no provider
// has been configured yet.
func (m *manager) Init(grp unison.Group, mode v2.Mode) error {
func (m *manager) Init(grp unison.Group) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/entityanalytics/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type testProvider struct {
createFn func(c *config.C) (v2.Input, error)
}

func (p *testProvider) Init(grp unison.Group, mode v2.Mode) error {
func (p *testProvider) Init(grp unison.Group) error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type managerConfig struct {
}

// Init initializes any required resources. It is currently a no-op.
func (m *Manager) Init(grp unison.Group, mode v2.Mode) error {
func (m *Manager) Init(grp unison.Group) error {
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/stretchr/testify/require"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-concert/unison"
Expand Down Expand Up @@ -77,7 +76,7 @@ func TestManager_Init(t *testing.T) {
var grp unison.TaskGroup

m := Manager{}
gotErr := m.Init(&grp, v2.ModeRun)
gotErr := m.Init(&grp)

require.NoError(t, gotErr)
}
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/httpjson/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage
}

// Init initializes both wrapped input managers.
func (m InputManager) Init(grp unison.Group, mode v2.Mode) error {
func (m InputManager) Init(grp unison.Group) error {
return multierr.Append(
m.stateless.Init(grp, mode),
m.cursor.Init(grp, mode),
m.stateless.Init(grp),
m.cursor.Init(grp),
)
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/netflow/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type netflowInputManager struct {
log *logp.Logger
}

func (im *netflowInputManager) Init(_ unison.Group, _ v2.Mode) error {
func (im *netflowInputManager) Init(_ unison.Group) error {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/salesforce/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type source struct{ cfg config }
func (s *source) Name() string { return s.cfg.URL }

// Init initializes both wrapped input managers.
func (m InputManager) Init(grp unison.Group, mode v2.Mode) error {
return m.cursor.Init(grp, mode)
func (m InputManager) Init(grp unison.Group) error {
return m.cursor.Init(grp)
}

// Create creates a cursor input manager.
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/salesforce/input_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/stretchr/testify/assert"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
Expand Down Expand Up @@ -49,7 +48,7 @@ func TestInputManager(t *testing.T) {
var inputTaskGroup unison.TaskGroup
defer inputTaskGroup.Stop() //nolint:errcheck // ignore error in test

err := inputManager.Init(&inputTaskGroup, v2.ModeRun)
err := inputManager.Init(&inputTaskGroup)
assert.NoError(t, err)

config, err := conf.NewConfigFrom(map[string]interface{}{
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/shipper/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func NewInputManager(log *logp.Logger) *InputManager {

// Init initializes the manager
// not sure if the shipper needs to do anything at this point?
func (im *InputManager) Init(_ unison.Group, _ v2.Mode) error {
func (im *InputManager) Init(_ unison.Group) error {
return nil
}

Expand Down
Loading

0 comments on commit dfea840

Please sign in to comment.